Merge pull request #12239 from ywk253100/200615_task_manager_dao

Implement execution/task DAO for task manager
This commit is contained in:
Steven Zou 2020-06-17 17:56:00 +08:00 committed by GitHub
commit ffd889f82a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 676 additions and 0 deletions

View File

@ -0,0 +1,134 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"context"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
)
// ExecutionDAO is the data access object interface for execution
type ExecutionDAO interface {
// Count returns the total count of executions according to the query
Count(ctx context.Context, query *q.Query) (count int64, err error)
// List the executions according to the query
List(ctx context.Context, query *q.Query) (executions []*Execution, err error)
// Get the specified execution
Get(ctx context.Context, id int64) (execution *Execution, err error)
// Create an execution
Create(ctx context.Context, execution *Execution) (id int64, err error)
// Update the specified execution. Only the properties specified by "props" will be updated if it is set
Update(ctx context.Context, execution *Execution, props ...string) (err error)
// Delete the specified execution
Delete(ctx context.Context, id int64) (err error)
}
// NewExecutionDAO returns an instance of ExecutionDAO
func NewExecutionDAO() ExecutionDAO {
return &executionDAO{}
}
type executionDAO struct{}
func (e *executionDAO) Count(ctx context.Context, query *q.Query) (int64, error) {
if query != nil {
// ignore the page number and size
query = &q.Query{
Keywords: query.Keywords,
}
}
qs, err := orm.QuerySetter(ctx, &Execution{}, query)
if err != nil {
return 0, err
}
return qs.Count()
}
func (e *executionDAO) List(ctx context.Context, query *q.Query) ([]*Execution, error) {
executions := []*Execution{}
qs, err := orm.QuerySetter(ctx, &Execution{}, query)
if err != nil {
return nil, err
}
qs = qs.OrderBy("-StartTime")
if _, err = qs.All(&executions); err != nil {
return nil, err
}
return executions, nil
}
func (e *executionDAO) Get(ctx context.Context, id int64) (*Execution, error) {
execution := &Execution{
ID: id,
}
ormer, err := orm.FromContext(ctx)
if err != nil {
return nil, err
}
if err := ormer.Read(execution); err != nil {
if e := orm.AsNotFoundError(err, "execution %d not found", id); e != nil {
err = e
}
return nil, err
}
return execution, nil
}
func (e *executionDAO) Create(ctx context.Context, execution *Execution) (int64, error) {
ormer, err := orm.FromContext(ctx)
if err != nil {
return 0, err
}
return ormer.Insert(execution)
}
func (e *executionDAO) Update(ctx context.Context, execution *Execution, props ...string) error {
ormer, err := orm.FromContext(ctx)
if err != nil {
return err
}
n, err := ormer.Update(execution, props...)
if err != nil {
return err
}
if n == 0 {
return errors.NotFoundError(nil).WithMessage("execution %d not found", execution.ID)
}
return nil
}
func (e *executionDAO) Delete(ctx context.Context, id int64) error {
ormer, err := orm.FromContext(ctx)
if err != nil {
return err
}
n, err := ormer.Delete(&Execution{
ID: id,
})
if err != nil {
if e := orm.AsForeignKeyError(err,
"the execution %d is referenced by other tasks", id); e != nil {
err = e
}
return err
}
if n == 0 {
return errors.NotFoundError(nil).WithMessage("execution %d not found", id)
}
return nil
}

View File

@ -0,0 +1,121 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"context"
"testing"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/stretchr/testify/suite"
)
type executionDAOTestSuite struct {
suite.Suite
ctx context.Context
executionDAO *executionDAO
executionID int64
}
func (e *executionDAOTestSuite) SetupSuite() {
dao.PrepareTestForPostgresSQL()
e.ctx = orm.Context()
e.executionDAO = &executionDAO{}
}
func (e *executionDAOTestSuite) SetupTest() {
id, err := e.executionDAO.Create(e.ctx, &Execution{
VendorType: "test",
Trigger: "test",
ExtraAttrs: "{}",
})
e.Require().Nil(err)
e.executionID = id
}
func (e *executionDAOTestSuite) TearDownTest() {
err := e.executionDAO.Delete(e.ctx, e.executionID)
e.Nil(err)
}
func (e *executionDAOTestSuite) TestCount() {
count, err := e.executionDAO.Count(e.ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": "test",
},
})
e.Require().Nil(err)
e.Equal(int64(1), count)
}
func (e *executionDAOTestSuite) TestList() {
executions, err := e.executionDAO.List(e.ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": "test",
},
})
e.Require().Nil(err)
e.Require().Len(executions, 1)
e.Equal(e.executionID, executions[0].ID)
}
func (e *executionDAOTestSuite) TestGet() {
// not exist
_, err := e.executionDAO.Get(e.ctx, 10000)
e.Require().NotNil(err)
e.True(errors.IsNotFoundErr(err))
// exist
execution, err := e.executionDAO.Get(e.ctx, e.executionID)
e.Require().Nil(err)
e.NotNil(execution)
}
func (e *executionDAOTestSuite) TestCreate() {
// happy pass is covered by SetupTest
}
func (e *executionDAOTestSuite) TestUpdate() {
// not exist
err := e.executionDAO.Update(e.ctx, &Execution{ID: 10000}, "Status")
e.Require().NotNil(err)
e.True(errors.IsNotFoundErr(err))
// exist
err = e.executionDAO.Update(e.ctx, &Execution{
ID: e.executionID,
Status: "failed",
}, "Status")
e.Require().Nil(err)
execution, err := e.executionDAO.Get(e.ctx, e.executionID)
e.Require().Nil(err)
e.Equal("failed", execution.Status)
}
func (e *executionDAOTestSuite) TestDelete() {
// not exist
err := e.executionDAO.Delete(e.ctx, 10000)
e.Require().NotNil(err)
e.True(errors.IsNotFoundErr(err))
// happy pass is covered by TearDownTest
}
func TestExecutionDAOSuite(t *testing.T) {
suite.Run(t, &executionDAOTestSuite{})
}

View File

@ -57,3 +57,9 @@ type Task struct {
UpdateTime time.Time `orm:"column(update_time)"`
EndTime time.Time `orm:"column(end_time)"`
}
// StatusCount model
type StatusCount struct {
Status string `orm:"column(status)"`
Count int64 `orm:"column(count)"`
}

204
src/pkg/task/dao/task.go Normal file
View File

@ -0,0 +1,204 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"context"
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
)
// TaskDAO is the data access object interface for task
type TaskDAO interface {
// Count returns the total count of tasks according to the query
Count(ctx context.Context, query *q.Query) (count int64, err error)
// List the tasks according to the query
List(ctx context.Context, query *q.Query) (tasks []*Task, err error)
// Get the specified task
Get(ctx context.Context, id int64) (task *Task, err error)
// Create a task
Create(ctx context.Context, task *Task) (id int64, err error)
// Update the specified task. Only the properties specified by "props" will be updated if it is set
Update(ctx context.Context, task *Task, props ...string) (err error)
// UpdateStatus updates the status of task
UpdateStatus(ctx context.Context, id int64, status string, statusRevision int64) (err error)
// Delete the specified task
Delete(ctx context.Context, id int64) (err error)
// ListStatusCount lists the status count for the tasks reference the specified execution
ListStatusCount(ctx context.Context, executionID int64) (statusCounts []*StatusCount, err error)
// GetMaxEndTime gets the max end time for the tasks references the specified execution
GetMaxEndTime(ctx context.Context, executionID int64) (endTime time.Time, err error)
}
// NewTaskDAO returns an instance of TaskDAO
func NewTaskDAO() TaskDAO {
return &taskDAO{}
}
type taskDAO struct{}
func (t *taskDAO) Count(ctx context.Context, query *q.Query) (int64, error) {
if query != nil {
// ignore the page number and size
query = &q.Query{
Keywords: query.Keywords,
}
}
qs, err := orm.QuerySetter(ctx, &Task{}, query)
if err != nil {
return 0, err
}
return qs.Count()
}
func (t *taskDAO) List(ctx context.Context, query *q.Query) ([]*Task, error) {
tasks := []*Task{}
qs, err := orm.QuerySetter(ctx, &Task{}, query)
if err != nil {
return nil, err
}
qs = qs.OrderBy("-StartTime")
if _, err = qs.All(&tasks); err != nil {
return nil, err
}
return tasks, nil
}
func (t *taskDAO) Get(ctx context.Context, id int64) (*Task, error) {
task := &Task{
ID: id,
}
ormer, err := orm.FromContext(ctx)
if err != nil {
return nil, err
}
if err := ormer.Read(task); err != nil {
if e := orm.AsNotFoundError(err, "task %d not found", id); e != nil {
err = e
}
return nil, err
}
return task, nil
}
func (t *taskDAO) Create(ctx context.Context, task *Task) (int64, error) {
ormer, err := orm.FromContext(ctx)
if err != nil {
return 0, err
}
id, err := ormer.Insert(task)
if err != nil {
if e := orm.AsForeignKeyError(err,
"the task tries to reference a non existing execution %d", task.ExecutionID); e != nil {
err = e
}
return 0, err
}
return id, nil
}
func (t *taskDAO) Update(ctx context.Context, task *Task, props ...string) error {
ormer, err := orm.FromContext(ctx)
if err != nil {
return err
}
n, err := ormer.Update(task, props...)
if err != nil {
return err
}
if n == 0 {
return errors.NotFoundError(nil).WithMessage("task %d not found", task.ID)
}
return nil
}
func (t *taskDAO) UpdateStatus(ctx context.Context, id int64, status string, statusRevision int64) error {
ormer, err := orm.FromContext(ctx)
if err != nil {
return err
}
// status revision is the unix timestamp of job starting time, it's changing means a retrying of the job
startTime := time.Unix(statusRevision, 0)
// update run count and start time when status revision changes
sql := `update task set run_count = run_count +1, start_time = ?
where id = ? and status_revision < ?`
if _, err = ormer.Raw(sql, startTime, id, statusRevision).Exec(); err != nil {
return err
}
jobStatus := job.Status(status)
statusCode := jobStatus.Code()
var endTime time.Time
// when the task is in final status, update the end time
// when the task re-runs again, the end time should be cleared, so set the end time
// to null if the task isn't in final status
if jobStatus.Final() {
endTime = time.Now()
}
// use raw sql rather than the ORM as the sql generated by ORM isn't a "single" statement
// which means the operation isn't atomic, this will cause issues when running in concurrency
sql = `update task set status = ?, status_code = ?, status_revision = ?, end_time = ?
where id = ? and (status_revision = ? and status_code < ? or status_revision < ?) `
_, err = ormer.Raw(sql, status, statusCode, statusRevision, endTime,
id, statusRevision, statusCode, statusRevision).Exec()
return err
}
func (t *taskDAO) Delete(ctx context.Context, id int64) error {
ormer, err := orm.FromContext(ctx)
if err != nil {
return err
}
n, err := ormer.Delete(&Task{
ID: id,
})
if err != nil {
return err
}
if n == 0 {
return errors.NotFoundError(nil).WithMessage("task %d not found", id)
}
return nil
}
func (t *taskDAO) ListStatusCount(ctx context.Context, executionID int64) ([]*StatusCount, error) {
ormer, err := orm.FromContext(ctx)
if err != nil {
return nil, err
}
statusCounts := []*StatusCount{}
_, err = ormer.Raw("select status, count(*) as count from task where execution_id=? group by status", executionID).
QueryRows(&statusCounts)
if err != nil {
return nil, err
}
return statusCounts, nil
}
func (t *taskDAO) GetMaxEndTime(ctx context.Context, executionID int64) (time.Time, error) {
ormer, err := orm.FromContext(ctx)
if err != nil {
return time.Time{}, err
}
var endTime time.Time
err = ormer.Raw("select max(end_time) from task where execution_id = ?", executionID).
QueryRow(&endTime)
return endTime, nil
}

View File

@ -0,0 +1,211 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"context"
"testing"
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/stretchr/testify/suite"
)
type taskDAOTestSuite struct {
suite.Suite
ctx context.Context
taskDAO *taskDAO
executionDAO *executionDAO
executionID int64
taskID int64
}
func (t *taskDAOTestSuite) SetupSuite() {
t.ctx = orm.Context()
t.taskDAO = &taskDAO{}
t.executionDAO = &executionDAO{}
}
func (t *taskDAOTestSuite) SetupTest() {
id, err := t.executionDAO.Create(t.ctx, &Execution{
VendorType: "test",
Trigger: "test",
ExtraAttrs: "{}",
})
t.Require().Nil(err)
t.executionID = id
id, err = t.taskDAO.Create(t.ctx, &Task{
ExecutionID: t.executionID,
Status: "success",
StatusCode: 1,
ExtraAttrs: "{}",
})
t.Require().Nil(err)
t.taskID = id
}
func (t *taskDAOTestSuite) TearDownTest() {
err := t.taskDAO.Delete(t.ctx, t.taskID)
t.Nil(err)
err = t.executionDAO.Delete(t.ctx, t.executionID)
t.Nil(err)
}
func (t *taskDAOTestSuite) TestCount() {
count, err := t.taskDAO.Count(t.ctx, &q.Query{
Keywords: map[string]interface{}{
"ExecutionID": t.executionID,
},
})
t.Require().Nil(err)
t.Equal(int64(1), count)
}
func (t *taskDAOTestSuite) TestList() {
tasks, err := t.taskDAO.List(t.ctx, &q.Query{
Keywords: map[string]interface{}{
"ExecutionID": t.executionID,
},
})
t.Require().Nil(err)
t.Require().Len(tasks, 1)
t.Equal(t.taskID, tasks[0].ID)
}
func (t *taskDAOTestSuite) TestGet() {
// not exist
_, err := t.taskDAO.Get(t.ctx, 10000)
t.Require().NotNil(err)
t.True(errors.IsNotFoundErr(err))
// exist
task, err := t.taskDAO.Get(t.ctx, t.taskID)
t.Require().Nil(err)
t.NotNil(task)
}
func (t *taskDAOTestSuite) TestCreate() {
// reference the non-existing execution
_, err := t.taskDAO.Create(t.ctx, &Task{
ExecutionID: 10000,
Status: "success",
StatusCode: 1,
ExtraAttrs: "{}",
})
t.Require().NotNil(err)
t.True(errors.IsErr(err, errors.ViolateForeignKeyConstraintCode))
// reference the existing execution is covered by SetupTest
}
func (t *taskDAOTestSuite) TestUpdate() {
// not exist
err := t.taskDAO.Update(t.ctx, &Task{ID: 10000}, "Status")
t.Require().NotNil(err)
t.True(errors.IsNotFoundErr(err))
// exist
err = t.taskDAO.Update(t.ctx, &Task{
ID: t.taskID,
Status: "failed",
}, "Status")
t.Require().Nil(err)
task, err := t.taskDAO.Get(t.ctx, t.taskID)
t.Require().Nil(err)
t.Equal("failed", task.Status)
}
func (t *taskDAOTestSuite) TestUpdateStatus() {
// update status to running
status := job.RunningStatus.String()
statusRevision := time.Now().Unix()
err := t.taskDAO.UpdateStatus(t.ctx, t.taskID, status, statusRevision)
t.Require().Nil(err)
task, err := t.taskDAO.Get(t.ctx, t.taskID)
t.Require().Nil(err)
t.Equal(1, task.RunCount)
t.True(time.Unix(statusRevision, 0).Equal(task.StartTime))
t.Equal(status, task.Status)
t.Equal(job.RunningStatus.Code(), task.StatusCode)
t.Equal(statusRevision, task.StatusRevision)
t.Equal(time.Time{}, task.EndTime)
// update status to success
status = job.SuccessStatus.String()
err = t.taskDAO.UpdateStatus(t.ctx, t.taskID, status, statusRevision)
t.Require().Nil(err)
task, err = t.taskDAO.Get(t.ctx, t.taskID)
t.Require().Nil(err)
t.Equal(1, task.RunCount)
t.True(time.Unix(statusRevision, 0).Equal(task.StartTime))
t.Equal(status, task.Status)
t.Equal(job.SuccessStatus.Code(), task.StatusCode)
t.Equal(statusRevision, task.StatusRevision)
t.NotEqual(time.Time{}, task.EndTime)
// update status to running again with different revision
status = job.RunningStatus.String()
statusRevision = time.Now().Add(1 * time.Second).Unix()
err = t.taskDAO.UpdateStatus(t.ctx, t.taskID, status, statusRevision)
t.Require().Nil(err)
task, err = t.taskDAO.Get(t.ctx, t.taskID)
t.Require().Nil(err)
t.Equal(2, task.RunCount)
t.True(time.Unix(statusRevision, 0).Equal(task.StartTime))
t.Equal(status, task.Status)
t.Equal(job.RunningStatus.Code(), task.StatusCode)
t.Equal(statusRevision, task.StatusRevision)
t.Equal(time.Time{}, task.EndTime)
}
func (t *taskDAOTestSuite) TestDelete() {
// not exist
err := t.taskDAO.Delete(t.ctx, 10000)
t.Require().NotNil(err)
t.True(errors.IsNotFoundErr(err))
// happy pass is covered by TearDownTest
}
func (t *taskDAOTestSuite) TestListStatusCount() {
scs, err := t.taskDAO.ListStatusCount(t.ctx, t.executionID)
t.Require().Nil(err)
t.Require().Len(scs, 1)
t.Equal("success", scs[0].Status)
t.Equal(int64(1), scs[0].Count)
}
func (t *taskDAOTestSuite) TestGetMaxEndTime() {
now := time.Now()
err := t.taskDAO.Update(t.ctx, &Task{
ID: t.taskID,
EndTime: now,
}, "EndTime")
t.Require().Nil(err)
endTime, err := t.taskDAO.GetMaxEndTime(t.ctx, t.executionID)
t.Require().Nil(err)
t.Equal(now.Unix(), endTime.Unix())
}
func TestTaskDAOSuite(t *testing.T) {
suite.Run(t, &taskDAOTestSuite{})
}