Implement the task management in retention manager

Implement the task management in retention manager

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-07-23 09:40:37 +08:00
parent 388f8311f5
commit 03cc8046eb
8 changed files with 178 additions and 35 deletions

View File

@ -39,12 +39,12 @@ create table retention_execution
create table retention_task
(
id integer PRIMARY KEY NOT NULL,
id SERIAL NOT NULL,
execution_id integer,
rule_id integer,
rule_display_text varchar(255),
artifact varchar(255),
timestamp time
status varchar(32),
start_time timestamp default CURRENT_TIMESTAMP,
end_time timestamp default CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
create table schedule

View File

@ -1,11 +1,12 @@
package controllers
import (
"time"
"github.com/goharbor/harbor/src/core/api"
"github.com/goharbor/harbor/src/pkg/retention"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/q"
"time"
)
// RetentionAPI ...
@ -155,15 +156,15 @@ func (r *RetentionAPI) ListRetentionExecHistory() {
r.SendInternalServerError(err)
return
}
query := &q.Query{
PageNumber: page,
PageSize: size,
}
his, err := r.manager.ListHistories(eid, query)
tasks, err := r.manager.ListTasks(&q.TaskQuery{
ExecutionID: eid,
PageNumber: page,
PageSize: size,
})
if err != nil {
r.SendBadRequestError(err)
return
}
r.Data["json"] = his
r.Data["json"] = tasks
r.ServeJSON()
}

View File

@ -1,8 +1,9 @@
package models
import (
"github.com/astaxie/beego/orm"
"time"
"github.com/astaxie/beego/orm"
)
func init() {
@ -45,6 +46,7 @@ type RetentionExecution struct {
EndTime time.Time
}
/*
// RetentionTask Retention Task
type RetentionTask struct {
ID int64
@ -54,6 +56,16 @@ type RetentionTask struct {
Artifact string
Timestamp time.Time
}
*/
// RetentionTask ...
type RetentionTask struct {
ID int64 `orm:"pk;auto;column(id)"`
ExecutionID int64 `orm:"column(execution_id)"`
Status string `orm:"column(status)"`
StartTime time.Time `orm:"column(start_time)"`
EndTime time.Time `orm:"column(end_time)"`
}
// RetentionScheduleJob Retention Schedule Job
type RetentionScheduleJob struct {

View File

@ -1,6 +1,9 @@
package dao
import (
"errors"
"fmt"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/q"
@ -87,6 +90,7 @@ func ListExecutions(query *q.Query) ([]*models.RetentionExecution, error) {
return execs, nil
}
/*
// ListExecHistories List Execution Histories
func ListExecHistories(executionID int64, query *q.Query) ([]*models.RetentionTask, error) {
o := dao.GetOrmer()
@ -106,3 +110,55 @@ func AppendExecHistory(t *models.RetentionTask) (int64, error) {
o := dao.GetOrmer()
return o.Insert(t)
}
*/
// CreateTask creates task record in database
func CreateTask(task *models.RetentionTask) (int64, error) {
if task == nil {
return 0, errors.New("nil task")
}
return dao.GetOrmer().Insert(task)
}
// UpdateTask updates the task record in database
func UpdateTask(task *models.RetentionTask, cols ...string) error {
if task == nil {
return errors.New("nil task")
}
if task.ID <= 0 {
return fmt.Errorf("invalid task ID: %d", task.ID)
}
_, err := dao.GetOrmer().Update(task, cols...)
return err
}
// DeleteTask deletes the task record specified by ID in database
func DeleteTask(id int64) error {
_, err := dao.GetOrmer().Delete(&models.RetentionTask{
ID: id,
})
return err
}
// ListTask lists the tasks according to the query
func ListTask(query ...*q.TaskQuery) ([]*models.RetentionTask, error) {
qs := dao.GetOrmer().QueryTable(&models.RetentionTask{})
if len(query) > 0 && query[0] != nil {
q := query[0]
if q.ExecutionID > 0 {
qs = qs.Filter("ExecutionID", q.ExecutionID)
}
if len(q.Status) > 0 {
qs = qs.Filter("Status", q.Status)
}
if q.PageSize > 0 {
qs = qs.Limit(q.PageSize)
if q.PageNumber > 0 {
qs = qs.Offset((q.PageNumber - 1) * q.PageSize)
}
}
}
tasks := []*models.RetentionTask{}
_, err := qs.All(&tasks)
return tasks, err
}

View File

@ -2,16 +2,18 @@ package dao
import (
"encoding/json"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/stretchr/testify/assert"
"os"
"strings"
"testing"
"time"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/goharbor/harbor/src/pkg/retention/q"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMain(m *testing.M) {
@ -96,3 +98,33 @@ func TestPolicy(t *testing.T) {
assert.NotNil(t, err)
assert.True(t, strings.Contains(err.Error(), "no row found"))
}
func TestTask(t *testing.T) {
task := &models.RetentionTask{
ExecutionID: 1,
Status: "pending",
}
// create
id, err := CreateTask(task)
require.Nil(t, err)
// update
task.ID = id
task.Status = "running"
err = UpdateTask(task, "Status")
require.Nil(t, err)
// list
tasks, err := ListTask(&q.TaskQuery{
ExecutionID: 1,
Status: "running",
})
require.Nil(t, err)
require.Equal(t, 1, len(tasks))
assert.Equal(t, int64(1), tasks[0].ExecutionID)
assert.Equal(t, "running", tasks[0].Status)
// delete
err = DeleteTask(id)
require.Nil(t, err)
}

View File

@ -19,14 +19,13 @@ import (
"strconv"
"testing"
"github.com/goharbor/harbor/src/pkg/retention/dep"
"github.com/goharbor/harbor/src/pkg/retention/policy/lwp"
"github.com/goharbor/harbor/src/chartserver"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/pkg/project"
"github.com/goharbor/harbor/src/pkg/repository"
"github.com/goharbor/harbor/src/pkg/retention/dep"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/lwp"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/goharbor/harbor/src/pkg/retention/q"
"github.com/goharbor/harbor/src/pkg/retention/res"
@ -116,13 +115,13 @@ func (f *fakeRetentionManager) UpdateExecution(execution *Execution) error {
func (f *fakeRetentionManager) GetExecution(eid int64) (*Execution, error) {
return nil, nil
}
func (f *fakeRetentionManager) ListTasks(query *q.Query) ([]*Task, error) {
func (f *fakeRetentionManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) {
return nil, nil
}
func (f *fakeRetentionManager) CreateTask(task *Task) (int64, error) {
return 0, nil
}
func (f *fakeRetentionManager) UpdateTask(task *Task) error {
func (f *fakeRetentionManager) UpdateTask(task *Task, cols ...string) error {
return nil
}
func (f *fakeRetentionManager) GetTaskLog(taskID int64) ([]byte, error) {

View File

@ -16,6 +16,8 @@ package retention
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/goharbor/harbor/src/pkg/retention/dao"
@ -42,20 +44,16 @@ type Manager interface {
UpdateExecution(execution *Execution) error
// Get the specified execution
GetExecution(eid int64) (*Execution, error)
// List execution histories
ListExecutions(query *q.Query) ([]*Execution, error)
// List tasks histories
ListTasks(query *q.Query) ([]*Task, error)
ListTasks(query ...*q.TaskQuery) ([]*Task, error)
// Create a new retention task
CreateTask(task *Task) (int64, error)
// Update the specified task
UpdateTask(task *Task) error
UpdateTask(task *Task, cols ...string) error
// Get the log of the specified task
GetTaskLog(taskID int64) ([]byte, error)
// List execution histories
ListExecutions(query *q.Query) ([]*Execution, error)
// Add new history
AppendHistory(history *History) error
// List all the histories marked by the specified execution
ListHistories(executionID int64, query *q.Query) ([]*History, error)
}
// DefaultManager ...
@ -161,17 +159,52 @@ func (d *DefaultManager) GetExecution(eid int64) (*Execution, error) {
// CreateTask creates task record
func (d *DefaultManager) CreateTask(task *Task) (int64, error) {
panic("implement me")
if task == nil {
return 0, errors.New("nil task")
}
t := &models.RetentionTask{
ExecutionID: task.ExecutionID,
Status: task.Status,
StartTime: task.StartTime,
EndTime: task.EndTime,
}
return dao.CreateTask(t)
}
// ListTasks lists tasks according to the query
func (d *DefaultManager) ListTasks(query *q.Query) ([]*Task, error) {
panic("implement me")
func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) {
ts, err := dao.ListTask(query...)
if err != nil {
return nil, err
}
tasks := []*Task{}
for _, t := range ts {
tasks = append(tasks, &Task{
ID: t.ID,
ExecutionID: t.ExecutionID,
Status: t.Status,
StartTime: t.StartTime,
EndTime: t.EndTime,
})
}
return tasks, nil
}
// UpdateTask updates the task
func (d *DefaultManager) UpdateTask(task *Task) error {
panic("implement me")
func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error {
if task == nil {
return errors.New("nil task")
}
if task.ID <= 0 {
return fmt.Errorf("invalid task ID: %d", task.ID)
}
return dao.UpdateTask(&models.RetentionTask{
ID: task.ID,
ExecutionID: task.ExecutionID,
Status: task.Status,
StartTime: task.StartTime,
EndTime: task.EndTime,
}, cols...)
}
// GetTaskLog gets the logs of task
@ -179,6 +212,7 @@ func (d *DefaultManager) GetTaskLog(taskID int64) ([]byte, error) {
panic("implement me")
}
/*
// ListHistories List Histories
func (d *DefaultManager) ListHistories(executionID int64, query *q.Query) ([]*History, error) {
his, err := dao.ListExecHistories(executionID, query)
@ -209,6 +243,7 @@ func (d *DefaultManager) AppendHistory(h *History) error {
_, err := dao.AppendExecHistory(h1)
return err
}
*/
// NewManager ...
func NewManager() Manager {

View File

@ -19,3 +19,11 @@ type Query struct {
PageNumber int64
PageSize int64
}
// TaskQuery parameters
type TaskQuery struct {
ExecutionID int64
Status string
PageNumber int64
PageSize int64
}