fix(retention) migrate sql error

Signed-off-by: Ziming Zhang <zziming@vmware.com>
This commit is contained in:
Ziming Zhang 2021-02-04 19:01:43 +08:00 committed by Ziming
parent 24ec772978
commit ec83f49a1a
7 changed files with 33 additions and 413 deletions

View File

@ -604,22 +604,37 @@ UPDATE retention_execution SET new_execution_id=new_exec_id WHERE id=rep_exec.id
END LOOP;
END $$;
/*move the replication task records into the new task table*/
/*move the retention task records into the new task table*/
DO $$
DECLARE
rep_task RECORD;
status_code integer;
status_revision integer;
BEGIN
FOR rep_task IN SELECT * FROM retention_task
LOOP
IF rep_task.status = 'Pending' THEN
status_code=0;
ELSIF rep_task.status = 'Scheduled' THEN
status_code=1;
ELSIF rep_task.status = 'Running' THEN
status_code=2;
ELSE
status_code=3;
END IF;
IF rep_task.status_revision is not null THEN
status_revision=rep_task.status_revision;
ELSE
status_revision=0;
END IF;
INSERT INTO task (vendor_type, execution_id, job_id, status, status_code, status_revision,
run_count, extra_attrs, creation_time, start_time, update_time, end_time)
VALUES ('RETENTION', (SELECT new_execution_id FROM retention_execution WHERE id=rep_task.execution_id),
rep_task.job_id, rep_task.status, rep_task.status_code, rep_task.status_revision,
rep_task.job_id, rep_task.status, status_code, status_revision,
1, CONCAT('{"total":"', rep_task.total,'","retained":"', rep_task.retained,'"}')::json,
rep_task.start_time, rep_task.start_time, rep_task.end_time, rep_task.end_time);
END LOOP;
END $$;
DROP TABLE IF EXISTS replication_task;
DROP TABLE IF EXISTS replication_execution;
DROP TABLE IF EXISTS retention_task;
DROP TABLE IF EXISTS retention_execution;

View File

@ -85,7 +85,7 @@ type defaultController struct {
const (
// SchedulerCallback ...
SchedulerCallback = "RETENTION"
schedulerVendorType = "RETENTION"
schedulerVendorType = job.Retention
)
// TriggerParam ...
@ -199,7 +199,7 @@ func (r *defaultController) DeleteRetention(ctx context.Context, id int64) error
if err != nil {
return err
}
return r.manager.DeletePolicyAndExec(id)
return r.manager.DeletePolicy(id)
}
// deleteExecs delete executions
@ -251,7 +251,6 @@ func (r *defaultController) TriggerRetentionExec(ctx context.Context, policyID i
}
}
return id, err
}
// OperateRetentionExec Operate Retention Execution

View File

@ -1,16 +1,8 @@
package dao
import (
"errors"
"fmt"
"strconv"
"time"
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/q"
)
// CreatePolicy Create Policy
@ -26,20 +18,13 @@ func UpdatePolicy(p *models.RetentionPolicy, cols ...string) error {
return err
}
// DeletePolicyAndExec Delete Policy and Exec
func DeletePolicyAndExec(id int64) error {
// DeletePolicy Update Policy
func DeletePolicy(id int64) error {
o := dao.GetOrmer()
if _, err := o.Raw("delete from retention_task where execution_id in (select id from retention_execution where policy_id = ?) ", id).Exec(); err != nil {
return nil
}
if _, err := o.Delete(&models.RetentionExecution{
PolicyID: id,
}); err != nil {
return err
}
_, err := o.Delete(&models.RetentionPolicy{
p := &models.RetentionPolicy{
ID: id,
})
}
_, err := o.Delete(p)
return err
}
@ -54,258 +39,3 @@ func GetPolicy(id int64) (*models.RetentionPolicy, error) {
}
return p, nil
}
// CreateExecution Create Execution
func CreateExecution(e *models.RetentionExecution) (int64, error) {
o := dao.GetOrmer()
return o.Insert(e)
}
// UpdateExecution Update Execution
func UpdateExecution(e *models.RetentionExecution, cols ...string) error {
o := dao.GetOrmer()
_, err := o.Update(e, cols...)
return err
}
// DeleteExecution Delete Execution
func DeleteExecution(id int64) error {
o := dao.GetOrmer()
_, err := o.Delete(&models.RetentionTask{
ExecutionID: id,
})
if err != nil {
return err
}
_, err = o.Delete(&models.RetentionExecution{
ID: id,
})
return err
}
// GetExecution Get Execution
func GetExecution(id int64) (*models.RetentionExecution, error) {
o := dao.GetOrmer()
e := &models.RetentionExecution{
ID: id,
}
if err := o.Read(e); err != nil {
return nil, err
}
if err := fillStatus(e); err != nil {
return nil, err
}
return e, nil
}
// fillStatus the priority is InProgress Stopped Failed Succeed
func fillStatus(exec *models.RetentionExecution) error {
o := dao.GetOrmer()
var r orm.Params
if _, err := o.Raw("select status, count(*) num from retention_task where execution_id = ? group by status", exec.ID).
RowsToMap(&r, "status", "num"); err != nil {
return err
}
var (
total, running, succeed, failed, stopped int64
)
for k, s := range r {
v, err := strconv.ParseInt(s.(string), 10, 64)
if err != nil {
return err
}
total += v
switch k {
case job.ScheduledStatus.String():
running += v
case job.PendingStatus.String():
running += v
case job.RunningStatus.String():
running += v
case job.SuccessStatus.String():
succeed += v
case job.StoppedStatus.String():
stopped += v
case job.ErrorStatus.String():
failed += v
}
}
if total == 0 {
exec.Status = models.ExecutionStatusSucceed
exec.EndTime = exec.StartTime
return nil
}
if running > 0 {
exec.Status = models.ExecutionStatusInProgress
} else if stopped > 0 {
exec.Status = models.ExecutionStatusStopped
} else if failed > 0 {
exec.Status = models.ExecutionStatusFailed
} else {
exec.Status = models.ExecutionStatusSucceed
}
if exec.Status != models.ExecutionStatusInProgress {
if err := o.Raw("select max(end_time) from retention_task where execution_id = ?", exec.ID).
QueryRow(&exec.EndTime); err != nil {
return err
}
}
return nil
}
// ListExecutions List Executions
func ListExecutions(policyID int64, query *q.Query) ([]*models.RetentionExecution, error) {
o := dao.GetOrmer()
qs := o.QueryTable(new(models.RetentionExecution))
qs = qs.Filter("policy_id", policyID)
qs = qs.OrderBy("-id")
if query != nil {
qs = qs.Limit(query.PageSize, (query.PageNumber-1)*query.PageSize)
}
var execs []*models.RetentionExecution
_, err := qs.All(&execs)
if err != nil {
return nil, err
}
for _, e := range execs {
if err := fillStatus(e); err != nil {
return nil, err
}
}
return execs, nil
}
// GetTotalOfRetentionExecs Count Executions
func GetTotalOfRetentionExecs(policyID int64) (int64, error) {
o := dao.GetOrmer()
qs := o.QueryTable(new(models.RetentionExecution))
qs = qs.Filter("policy_id", policyID)
return qs.Count()
}
/*
// ListExecHistories List Execution Histories
func ListExecHistories(executionID int64, query *q.Query) ([]*models.RetentionTask, error) {
o := dao.GetOrmer()
qs := o.QueryTable(new(models.RetentionTask))
qs = qs.Filter("Execution_ID", executionID)
if query != nil {
qs = qs.Limit(query.PageSize, (query.PageNumber-1)*query.PageSize)
}
var tasks []*models.RetentionTask
_, err := qs.All(&tasks)
if err != nil {
return nil, err
}
return tasks, nil
}
// AppendExecHistory Append Execution History
func AppendExecHistory(t *models.RetentionTask) (int64, error) {
o := dao.GetOrmer()
return o.Insert(t)
}
*/
// CreateTask creates task record in database
func CreateTask(task *models.RetentionTask) (int64, error) {
if task == nil {
return 0, errors.New("nil task")
}
return dao.GetOrmer().Insert(task)
}
// UpdateTask updates the task record in database
func UpdateTask(task *models.RetentionTask, cols ...string) error {
if task == nil {
return errors.New("nil task")
}
if task.ID <= 0 {
return fmt.Errorf("invalid task ID: %d", task.ID)
}
_, err := dao.GetOrmer().Update(task, cols...)
return err
}
// UpdateTaskStatus updates the status of task according to the status code and revision to avoid
// override when running in concurrency
func UpdateTaskStatus(taskID int64, status string, statusCode int, statusRevision int64) error {
params := []interface{}{}
// use raw sql rather than the ORM as the sql generated by ORM isn't a "single" statement
// which means the operation isn't atomic
sql := `update retention_task set status = ?, status_code = ?, status_revision = ?, end_time = ? `
params = append(params, status, statusCode, statusRevision)
var t time.Time
// when the task is in final status, update the endtime
// when the task re-runs again, the endtime should be cleared
// so set the endtime to null if the task isn't in final status
if IsFinalStatus(status) {
t = time.Now()
}
params = append(params, t)
sql += `where id = ? and
(status_revision = ? and status_code < ? or status_revision < ?) `
params = append(params, taskID, statusRevision, statusCode, statusRevision)
_, err := dao.GetOrmer().Raw(sql, params).Exec()
return err
}
// DeleteTask deletes the task record specified by ID in database
func DeleteTask(id int64) error {
_, err := dao.GetOrmer().Delete(&models.RetentionTask{
ID: id,
})
return err
}
// GetTask get the task record specified by ID in database
func GetTask(id int64) (*models.RetentionTask, error) {
task := &models.RetentionTask{
ID: id,
}
if err := dao.GetOrmer().Read(task); err != nil {
return nil, err
}
return task, nil
}
// ListTask lists the tasks according to the query
func ListTask(query ...*q.TaskQuery) ([]*models.RetentionTask, error) {
qs := dao.GetOrmer().QueryTable(&models.RetentionTask{})
if len(query) > 0 && query[0] != nil {
q := query[0]
if q.ExecutionID > 0 {
qs = qs.Filter("ExecutionID", q.ExecutionID)
}
if len(q.Status) > 0 {
qs = qs.Filter("Status", q.Status)
}
if q.PageSize > 0 {
qs = qs.Limit(q.PageSize)
if q.PageNumber > 0 {
qs = qs.Offset((q.PageNumber - 1) * q.PageSize)
}
}
}
tasks := []*models.RetentionTask{}
_, err := qs.All(&tasks)
return tasks, err
}
// GetTotalOfTasks Count tasks
func GetTotalOfTasks(executionID int64) (int64, error) {
qs := dao.GetOrmer().QueryTable(&models.RetentionTask{})
qs = qs.Filter("ExecutionID", executionID)
return qs.Count()
}
// IsFinalStatus checks whether the status is a final status
func IsFinalStatus(status string) bool {
if status == job.StoppedStatus.String() || status == job.SuccessStatus.String() ||
status == job.ErrorStatus.String() {
return true
}
return false
}

View File

@ -11,9 +11,7 @@ import (
"github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/goharbor/harbor/src/pkg/retention/q"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMain(m *testing.M) {
@ -87,132 +85,10 @@ func TestPolicy(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, "test", p1.ScopeLevel)
err = DeletePolicyAndExec(id)
err = DeletePolicy(id)
assert.Nil(t, err)
p1, err = GetPolicy(id)
assert.NotNil(t, err)
assert.True(t, strings.Contains(err.Error(), "no row found"))
}
func TestExecution(t *testing.T) {
p := &policy.Metadata{
Algorithm: "or",
Rules: []rule.Metadata{
{
ID: 1,
Priority: 1,
Template: "latestPushedK",
Action: "retain",
Parameters: rule.Parameters{
"latestPushedK": 10,
},
TagSelectors: []*rule.Selector{
{
Kind: "doublestar",
Decoration: "matches",
Pattern: "release-[\\d\\.]+",
},
},
ScopeSelectors: map[string][]*rule.Selector{
"repository": {
{
Kind: "doublestar",
Decoration: "matches",
Pattern: ".+",
},
},
},
},
},
Trigger: &policy.Trigger{
Kind: "Schedule",
Settings: map[string]interface{}{
"cron": "* 22 11 * * *",
},
},
Scope: &policy.Scope{
Level: "project",
Reference: 1,
},
}
p1 := &models.RetentionPolicy{
ScopeLevel: p.Scope.Level,
TriggerKind: p.Trigger.Kind,
CreateTime: time.Now(),
UpdateTime: time.Now(),
}
data, _ := json.Marshal(p)
p1.Data = string(data)
policyID, err := CreatePolicy(p1)
assert.Nil(t, err)
assert.True(t, policyID > 0)
e := &models.RetentionExecution{
PolicyID: policyID,
DryRun: false,
Trigger: "manual",
StartTime: time.Now(),
}
id, err := CreateExecution(e)
assert.Nil(t, err)
assert.True(t, id > 0)
e1, err := GetExecution(id)
assert.Nil(t, err)
assert.NotNil(t, e1)
assert.EqualValues(t, id, e1.ID)
es, err := ListExecutions(policyID, nil)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(es))
}
func TestTask(t *testing.T) {
task := &models.RetentionTask{
ExecutionID: 1,
Status: "Pending",
StartTime: time.Now().Truncate(time.Second),
}
// create
id, err := CreateTask(task)
require.Nil(t, err)
// get
tk, err := GetTask(id)
require.Nil(t, err)
require.Equal(t, id, tk.ID)
require.Equal(t, "Pending", tk.Status)
// update
task.ID = id
task.Total = 1
err = UpdateTask(task, "Total")
require.Nil(t, err)
// update status
err = UpdateTaskStatus(id, "Running", 1, 1)
require.Nil(t, err)
// list
tasks, err := ListTask(&q.TaskQuery{
ExecutionID: 1,
Status: "Running",
})
require.Nil(t, err)
require.Equal(t, 1, len(tasks))
assert.Equal(t, 1, tasks[0].Total)
assert.Equal(t, int64(1), tasks[0].ExecutionID)
assert.Equal(t, "Running", tasks[0].Status)
assert.Equal(t, 1, tasks[0].StatusCode)
assert.Equal(t, int64(1), tasks[0].StatusRevision)
// update status
err = UpdateTaskStatus(id, "Stopped", 1, 2)
require.Nil(t, err)
// delete
err = DeleteTask(id)
require.Nil(t, err)
}

View File

@ -51,7 +51,7 @@ func (f *fakeRetentionManager) CreatePolicy(p *policy.Metadata) (int64, error) {
func (f *fakeRetentionManager) UpdatePolicy(p *policy.Metadata) error {
return nil
}
func (f *fakeRetentionManager) DeletePolicyAndExec(ID int64) error {
func (f *fakeRetentionManager) DeletePolicy(ID int64) error {
return nil
}
func (f *fakeRetentionManager) GetPolicy(ID int64) (*policy.Metadata, error) {

View File

@ -34,7 +34,7 @@ type Manager interface {
UpdatePolicy(p *policy.Metadata) error
// Delete the specified policy
// No actual use so far
DeletePolicyAndExec(ID int64) error
DeletePolicy(ID int64) error
// Get the specified policy
GetPolicy(ID int64) (*policy.Metadata, error)
}
@ -71,9 +71,9 @@ func (d *DefaultManager) UpdatePolicy(p *policy.Metadata) error {
return dao.UpdatePolicy(p1, "scope_level", "trigger_kind", "data", "update_time")
}
// DeletePolicyAndExec Delete Policy
func (d *DefaultManager) DeletePolicyAndExec(id int64) error {
return dao.DeletePolicyAndExec(id)
// DeletePolicy Delete Policy
func (d *DefaultManager) DeletePolicy(id int64) error {
return dao.DeletePolicy(id)
}
// GetPolicy Get Policy

View File

@ -73,7 +73,7 @@ func TestPolicy(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, "test", p1.Scope.Level)
err = m.DeletePolicyAndExec(id)
err = m.DeletePolicy(id)
assert.Nil(t, err)
p1, err = m.GetPolicy(id)