From ea2069026480f5b22a3f8bd90085191c73d51077 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 16 Jun 2020 17:58:58 +0800 Subject: [PATCH] Implement task and execution manager Implement task and execution manager Signed-off-by: Wenkai Yin --- src/common/job/client.go | 19 +- src/core/config/config.go | 2 +- src/core/config/config_test.go | 2 + src/core/main.go | 3 - src/core/middlewares/middlewares.go | 29 +-- src/core/middlewares/middlewares_test.go | 24 -- src/pkg/task/checkin.go | 3 +- src/pkg/task/dao/model.go | 2 +- src/pkg/task/dao/task.go | 7 +- src/pkg/task/dao/task_test.go | 1 + src/pkg/task/execution.go | 253 ++++++++++++++++++- src/pkg/task/execution_test.go | 267 ++++++++++++++++++++ src/pkg/task/hook.go | 63 +++++ src/pkg/task/hook_test.go | 81 ++++++ src/pkg/task/mock.go | 33 +++ src/pkg/task/mock_execution_dao_test.go | 140 ++++++++++ src/pkg/task/mock_jobservice_client_test.go | 96 +++++++ src/pkg/task/mock_task_dao_test.go | 200 +++++++++++++++ src/pkg/task/mock_task_manager_test.go | 126 +++++++++ src/pkg/task/model.go | 24 ++ src/pkg/task/task.go | 178 +++++++++++++ src/pkg/task/task_test.go | 142 +++++++++++ src/replication/operation/controller.go | 14 +- src/server/handler/job_status_hook.go | 58 +++++ src/server/route.go | 2 + 25 files changed, 1685 insertions(+), 84 deletions(-) create mode 100644 src/pkg/task/execution_test.go create mode 100644 src/pkg/task/hook.go create mode 100644 src/pkg/task/hook_test.go create mode 100644 src/pkg/task/mock.go create mode 100644 src/pkg/task/mock_execution_dao_test.go create mode 100644 src/pkg/task/mock_jobservice_client_test.go create mode 100644 src/pkg/task/mock_task_dao_test.go create mode 100644 src/pkg/task/mock_task_manager_test.go create mode 100644 src/pkg/task/task_test.go create mode 100644 src/server/handler/job_status_hook.go diff --git a/src/common/job/client.go b/src/common/job/client.go index 7c22d891a..1030360b3 100644 --- a/src/common/job/client.go +++ b/src/common/job/client.go @@ -3,6 +3,7 @@ package job import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "net/http" @@ -18,10 +19,12 @@ import ( var ( // GlobalClient is an instance of the default client that can be used globally - // Notes: the client needs to be initialized before can be used - GlobalClient Client - statusBehindErrorPattern = "mismatch job status for stopping job: .*, job status (.*) is behind Running" - statusBehindErrorReg = regexp.MustCompile(statusBehindErrorPattern) + GlobalClient Client = NewDefaultClient(config.InternalJobServiceURL(), config.CoreSecret()) + statusBehindErrorPattern = "mismatch job status for stopping job: .*, job status (.*) is behind Running" + statusBehindErrorReg = regexp.MustCompile(statusBehindErrorPattern) + + // ErrJobNotFound indicates the job not found + ErrJobNotFound = errors.New("job not found") ) // Client wraps interface to access jobservice. @@ -54,11 +57,6 @@ type DefaultClient struct { client *commonhttp.Client } -// Init the GlobalClient -func Init() { - GlobalClient = NewDefaultClient(config.InternalJobServiceURL(), config.CoreSecret()) -} - // NewDefaultClient creates a default client based on endpoint and secret. func NewDefaultClient(endpoint, secret string) *DefaultClient { var c *commonhttp.Client @@ -206,6 +204,9 @@ func (d *DefaultClient) PostAction(uuid, action string) error { status: status, } } + if e, ok := err.(*commonhttp.Error); ok && e.Code == http.StatusNotFound { + return ErrJobNotFound + } return err } return nil diff --git a/src/core/config/config.go b/src/core/config/config.go index 709cbe2d4..20f9d5880 100755 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -231,7 +231,7 @@ func RegistryURL() (string, error) { // InternalJobServiceURL returns jobservice URL for internal communication between Harbor containers func InternalJobServiceURL() string { - return strings.TrimSuffix(cfgMgr.Get(common.JobServiceURL).GetString(), "/") + return os.Getenv("JOBSERVICE_URL") } // GetCoreURL returns the url of core from env diff --git a/src/core/config/config_test.go b/src/core/config/config_test.go index 6974d86b5..005d6e7ea 100644 --- a/src/core/config/config_test.go +++ b/src/core/config/config_test.go @@ -58,6 +58,8 @@ func TestConfig(t *testing.T) { } defer os.Setenv("TOKEN_PRIVATE_KEY_PATH", oriKeyPath) + os.Setenv("JOBSERVICE_URL", "http://myjob:8888") + Init() if err := Load(); err != nil { diff --git a/src/core/main.go b/src/core/main.go index 5894eeeb5..07ff85ed0 100755 --- a/src/core/main.go +++ b/src/core/main.go @@ -27,7 +27,6 @@ import ( "github.com/goharbor/harbor/src/common/dao" common_http "github.com/goharbor/harbor/src/common/http" - "github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils" _ "github.com/goharbor/harbor/src/controller/event/handler" @@ -126,8 +125,6 @@ func main() { log.Fatalf("failed to load config: %v", err) } - // init the jobservice client - job.Init() // init the scheduler scheduler.Init() diff --git a/src/core/middlewares/middlewares.go b/src/core/middlewares/middlewares.go index c0fd0753b..b94362fee 100644 --- a/src/core/middlewares/middlewares.go +++ b/src/core/middlewares/middlewares.go @@ -15,23 +15,20 @@ package middlewares import ( - "github.com/goharbor/harbor/src/server/middleware/csrf" - "github.com/goharbor/harbor/src/server/middleware/log" - "github.com/goharbor/harbor/src/server/middleware/requestid" - "net/http" - "path" - "regexp" - "strings" - "github.com/astaxie/beego" "github.com/docker/distribution/reference" "github.com/goharbor/harbor/src/server/middleware" + "github.com/goharbor/harbor/src/server/middleware/csrf" + "github.com/goharbor/harbor/src/server/middleware/log" "github.com/goharbor/harbor/src/server/middleware/notification" "github.com/goharbor/harbor/src/server/middleware/orm" "github.com/goharbor/harbor/src/server/middleware/readonly" + "github.com/goharbor/harbor/src/server/middleware/requestid" "github.com/goharbor/harbor/src/server/middleware/security" "github.com/goharbor/harbor/src/server/middleware/session" "github.com/goharbor/harbor/src/server/middleware/transaction" + "net/http" + "regexp" ) var ( @@ -61,18 +58,6 @@ var ( } ) -// legacyAPISkipper skip middleware for legacy APIs -func legacyAPISkipper(r *http.Request) bool { - path := path.Clean(r.URL.EscapedPath()) - for _, prefix := range []string{"/v2/", "/api/v2.0/"} { - if strings.HasPrefix(path, prefix) { - return false - } - } - - return true -} - // MiddleWares returns global middlewares func MiddleWares() []beego.MiddleWare { return []beego.MiddleWare{ @@ -82,9 +67,9 @@ func MiddleWares() []beego.MiddleWare { csrf.Middleware(), security.Middleware(), readonly.Middleware(readonlySkippers...), - orm.Middleware(legacyAPISkipper), + orm.Middleware(), // notification must ahead of transaction ensure the DB transaction execution complete notification.Middleware(), - transaction.Middleware(legacyAPISkipper, fetchBlobAPISkipper), + transaction.Middleware(fetchBlobAPISkipper), } } diff --git a/src/core/middlewares/middlewares_test.go b/src/core/middlewares/middlewares_test.go index d21a013d5..036cdae1e 100644 --- a/src/core/middlewares/middlewares_test.go +++ b/src/core/middlewares/middlewares_test.go @@ -42,30 +42,6 @@ func Test_fetchBlobAPISkipper(t *testing.T) { } } -func Test_legacyAPISkipper(t *testing.T) { - type args struct { - r *http.Request - } - tests := []struct { - name string - args args - want bool - }{ - {"/api/v2.0/projects", args{httptest.NewRequest(http.MethodGet, "/api/v2.0/projects", nil)}, false}, - {"//api/v2.0/projects", args{httptest.NewRequest(http.MethodGet, "//api/v2.0/projects", nil)}, false}, - {"/api/v2.0//projects", args{httptest.NewRequest(http.MethodGet, "/api/v2.0//projects", nil)}, false}, - {"/v2/library/photon/tags", args{httptest.NewRequest(http.MethodGet, "/v2/library/photon/tags", nil)}, false}, - {"/api/projects", args{httptest.NewRequest(http.MethodGet, "/api/projects", nil)}, true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := legacyAPISkipper(tt.args.r); got != tt.want { - t.Errorf("legacyAPISkipper() = %v, want %v", got, tt.want) - } - }) - } -} - func Test_readonlySkipper(t *testing.T) { type args struct { r *http.Request diff --git a/src/pkg/task/checkin.go b/src/pkg/task/checkin.go index 01788dd65..7b6a90fb7 100644 --- a/src/pkg/task/checkin.go +++ b/src/pkg/task/checkin.go @@ -15,6 +15,7 @@ package task import ( + "context" "fmt" "github.com/goharbor/harbor/src/jobservice/job" @@ -25,7 +26,7 @@ var ( ) // CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook -type CheckInProcessor func(task *Task, change *job.StatusChange) (err error) +type CheckInProcessor func(ctx context.Context, task *Task, change *job.StatusChange) (err error) // Register check in processor for the specific vendor type func Register(vendorType string, processor CheckInProcessor) error { diff --git a/src/pkg/task/dao/model.go b/src/pkg/task/dao/model.go index c3bf8f408..3ca06b5ee 100644 --- a/src/pkg/task/dao/model.go +++ b/src/pkg/task/dao/model.go @@ -45,7 +45,7 @@ type Execution struct { type Task struct { ID int64 `orm:"pk;auto;column(id)"` ExecutionID int64 `orm:"column(execution_id)"` - JobID int64 `orm:"column(job_id)"` + JobID string `orm:"column(job_id)"` Status string `orm:"column(status)"` StatusCode int `orm:"column(status_code)"` StatusRevision int64 `orm:"column(status_revision)"` diff --git a/src/pkg/task/dao/task.go b/src/pkg/task/dao/task.go index 0a6bc0a7e..1ec3cc169 100644 --- a/src/pkg/task/dao/task.go +++ b/src/pkg/task/dao/task.go @@ -146,17 +146,18 @@ func (t *taskDAO) UpdateStatus(ctx context.Context, id int64, status string, sta jobStatus := job.Status(status) statusCode := jobStatus.Code() var endTime time.Time + now := time.Now() // when the task is in final status, update the end time // when the task re-runs again, the end time should be cleared, so set the end time // to null if the task isn't in final status if jobStatus.Final() { - endTime = time.Now() + endTime = now } // use raw sql rather than the ORM as the sql generated by ORM isn't a "single" statement // which means the operation isn't atomic, this will cause issues when running in concurrency - sql = `update task set status = ?, status_code = ?, status_revision = ?, end_time = ? + sql = `update task set status = ?, status_code = ?, status_revision = ?, update_time = ?, end_time = ? where id = ? and (status_revision = ? and status_code < ? or status_revision < ?) ` - _, err = ormer.Raw(sql, status, statusCode, statusRevision, endTime, + _, err = ormer.Raw(sql, status, statusCode, statusRevision, now, endTime, id, statusRevision, statusCode, statusRevision).Exec() return err } diff --git a/src/pkg/task/dao/task_test.go b/src/pkg/task/dao/task_test.go index 9a2184fc3..eb59c1054 100644 --- a/src/pkg/task/dao/task_test.go +++ b/src/pkg/task/dao/task_test.go @@ -145,6 +145,7 @@ func (t *taskDAOTestSuite) TestUpdateStatus() { t.Equal(status, task.Status) t.Equal(job.RunningStatus.Code(), task.StatusCode) t.Equal(statusRevision, task.StatusRevision) + t.NotEqual(time.Time{}, task.UpdateTime) t.Equal(time.Time{}, task.EndTime) // update status to success diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index be79d4da6..34022daf9 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -16,24 +16,40 @@ package task import ( "context" + "encoding/json" "time" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/task/dao" +) + +var ( + // ExecMgr is a global execution manager instance + ExecMgr = NewExecutionManager() ) // ExecutionManager manages executions. // The execution and task managers provide an execution-task model to abstract the interactive with jobservice. // All of the operations with jobservice should be delegated by them type ExecutionManager interface { - // Create an execution. The "vendorType" specifies the type of vendor (replication, scan, gc, retention, etc.), - // and the "vendorID" specifies the ID of vendor if needed(policy ID for replication and retention). The - // "extraAttrs" can be used to set the customized attributes + // Create an execution. The "vendorType" specifies the type of vendor (e.g. replication, scan, gc, retention, etc.), + // and the "vendorID" specifies the ID of vendor if needed(e.g. policy ID for replication and retention). + // The "extraAttrs" can be used to set the customized attributes Create(ctx context.Context, vendorType string, vendorID int64, trigger string, extraAttrs ...map[string]interface{}) (id int64, err error) - // UpdateStatus updates the status of the execution. - // In most cases, the execution status can be calculated from the referenced tasks automatically. - // When the execution contains no tasks or failed to create tasks, the status should be set manually - UpdateStatus(ctx context.Context, id int64, status, message string, endTime time.Time) (err error) + // MarkDone marks the status of the specified execution as success. + // It must be called to update the execution status if the created execution contains no tasks. + // In other cases, the execution status can be calculated from the referenced tasks automatically + // and no need to update it explicitly + MarkDone(ctx context.Context, id int64, message string) (err error) + // MarkError marks the status of the specified execution as error. + // It must be called to update the execution status when failed to create tasks. + // In other cases, the execution status can be calculated from the referenced tasks automatically + // and no need to update it explicitly + MarkError(ctx context.Context, id int64, message string) (err error) // Stop all linked tasks of the specified execution Stop(ctx context.Context, id int64) (err error) // Delete the specified execution and its tasks @@ -43,3 +59,226 @@ type ExecutionManager interface { // List executions according to the query List(ctx context.Context, query *q.Query) (executions []*Execution, err error) } + +// NewExecutionManager return an instance of the default execution manager +func NewExecutionManager() ExecutionManager { + return &executionManager{ + executionDAO: dao.NewExecutionDAO(), + taskMgr: Mgr, + taskDAO: dao.NewTaskDAO(), + } +} + +type executionManager struct { + executionDAO dao.ExecutionDAO + taskMgr Manager + taskDAO dao.TaskDAO +} + +func (e *executionManager) Create(ctx context.Context, vendorType string, vendorID int64, trigger string, + extraAttrs ...map[string]interface{}) (int64, error) { + extras := map[string]interface{}{} + if len(extraAttrs) > 0 && extraAttrs[0] != nil { + extras = extraAttrs[0] + } + data, err := json.Marshal(extras) + if err != nil { + return 0, err + } + + execution := &dao.Execution{ + VendorType: vendorType, + VendorID: vendorID, + Trigger: trigger, + ExtraAttrs: string(data), + StartTime: time.Now(), + } + return e.executionDAO.Create(ctx, execution) +} + +func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error { + return e.executionDAO.Update(ctx, &dao.Execution{ + ID: id, + Status: job.SuccessStatus.String(), + StatusMessage: message, + EndTime: time.Now(), + }, "Status", "StatusMessage", "EndTime") +} + +func (e *executionManager) MarkError(ctx context.Context, id int64, message string) error { + return e.executionDAO.Update(ctx, &dao.Execution{ + ID: id, + Status: job.ErrorStatus.String(), + StatusMessage: message, + EndTime: time.Now(), + }, "Status", "StatusMessage", "EndTime") +} + +func (e *executionManager) Stop(ctx context.Context, id int64) error { + tasks, err := e.taskDAO.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "ExecutionID": id, + }, + }) + if err != nil { + return err + } + for _, task := range tasks { + if err = e.taskMgr.Stop(ctx, task.ID); err != nil { + log.Errorf("failed to stop task %d: %v", task.ID, err) + continue + } + } + return nil +} + +func (e *executionManager) Delete(ctx context.Context, id int64) error { + tasks, err := e.taskDAO.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "ExecutionID": id, + }, + }) + if err != nil { + return err + } + + for _, task := range tasks { + if !job.Status(task.Status).Final() { + return errors.New(nil).WithCode(errors.PreconditionCode). + WithMessage("the execution %d has tasks that aren't in final status, stop the tasks first", id) + } + if err = e.taskDAO.Delete(ctx, task.ID); err != nil { + return err + } + } + + return e.executionDAO.Delete(ctx, id) +} + +func (e *executionManager) Get(ctx context.Context, id int64) (*Execution, error) { + execution, err := e.executionDAO.Get(ctx, id) + if err != nil { + return nil, err + } + return e.populateExecution(ctx, execution), nil +} + +func (e *executionManager) List(ctx context.Context, query *q.Query) ([]*Execution, error) { + executions, err := e.executionDAO.List(ctx, query) + if err != nil { + return nil, err + } + var execs []*Execution + for _, execution := range executions { + execs = append(execs, e.populateExecution(ctx, execution)) + } + return execs, nil +} + +func (e *executionManager) populateExecution(ctx context.Context, execution *dao.Execution) *Execution { + exec := &Execution{ + ID: execution.ID, + VendorType: execution.VendorType, + VendorID: execution.VendorID, + Status: execution.Status, + StatusMessage: execution.StatusMessage, + Metrics: nil, + Trigger: execution.Trigger, + StartTime: execution.StartTime, + EndTime: execution.EndTime, + } + + if len(execution.ExtraAttrs) > 0 { + extras := map[string]interface{}{} + if err := json.Unmarshal([]byte(execution.ExtraAttrs), &extras); err != nil { + log.Errorf("failed to unmarshal the extra attributes of execution %d: %v", execution.ID, err) + } else { + exec.ExtraAttrs = extras + } + } + + // if the status isn't null which means the status is set manually, return directly + if len(exec.Status) > 0 { + return exec + } + + // populate task metrics + e.populateExecutionMetrics(ctx, exec) + // populate status + e.populateExecutionStatus(exec) + // populate the end time + e.populateExecutionEndTime(ctx, exec) + + return exec +} + +func (e *executionManager) populateExecutionMetrics(ctx context.Context, execution *Execution) { + scs, err := e.taskDAO.ListStatusCount(ctx, execution.ID) + if err != nil { + log.Errorf("failed to list status count of execution %d: %v", execution.ID, err) + return + } + if len(scs) == 0 { + return + } + + metrics := &Metrics{} + for _, sc := range scs { + switch sc.Status { + case job.SuccessStatus.String(): + metrics.SuccessTaskCount = sc.Count + case job.ErrorStatus.String(): + metrics.ErrorTaskCount = sc.Count + case job.PendingStatus.String(): + metrics.PendingTaskCount = sc.Count + case job.RunningStatus.String(): + metrics.RunningTaskCount = sc.Count + case job.ScheduledStatus.String(): + metrics.ScheduledTaskCount = sc.Count + case job.StoppedStatus.String(): + metrics.StoppedTaskCount = sc.Count + default: + log.Errorf("unknown task status: %s", sc.Status) + } + } + metrics.TaskCount = metrics.SuccessTaskCount + metrics.ErrorTaskCount + + metrics.PendingTaskCount + metrics.RunningTaskCount + + metrics.ScheduledTaskCount + metrics.StoppedTaskCount + execution.Metrics = metrics +} + +func (e *executionManager) populateExecutionStatus(execution *Execution) { + metrics := execution.Metrics + if metrics == nil { + execution.Status = job.RunningStatus.String() + return + } + if metrics.PendingTaskCount > 0 || metrics.RunningTaskCount > 0 || metrics.ScheduledTaskCount > 0 { + execution.Status = job.RunningStatus.String() + return + } + if metrics.ErrorTaskCount > 0 { + execution.Status = job.ErrorStatus.String() + return + } + if metrics.StoppedTaskCount > 0 { + execution.Status = job.StoppedStatus.String() + return + } + if metrics.SuccessTaskCount > 0 { + execution.Status = job.SuccessStatus.String() + return + } +} + +func (e *executionManager) populateExecutionEndTime(ctx context.Context, execution *Execution) { + if !job.Status(execution.Status).Final() { + return + } + endTime, err := e.taskDAO.GetMaxEndTime(ctx, execution.ID) + if err != nil { + log.Errorf("failed to get the max end time of the execution %d: %v", execution.ID, err) + return + } + execution.EndTime = endTime +} diff --git a/src/pkg/task/execution_test.go b/src/pkg/task/execution_test.go new file mode 100644 index 000000000..09b4493cc --- /dev/null +++ b/src/pkg/task/execution_test.go @@ -0,0 +1,267 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "testing" + "time" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/pkg/task/dao" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type executionManagerTestSuite struct { + suite.Suite + execMgr *executionManager + taskMgr *mockTaskManager + execDAO *mockExecutionDAO + taskDAO *mockTaskDAO +} + +func (e *executionManagerTestSuite) SetupTest() { + e.taskMgr = &mockTaskManager{} + e.execDAO = &mockExecutionDAO{} + e.taskDAO = &mockTaskDAO{} + e.execMgr = &executionManager{ + executionDAO: e.execDAO, + taskMgr: e.taskMgr, + taskDAO: e.taskDAO, + } +} + +func (e *executionManagerTestSuite) TestCreate() { + e.execDAO.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) + id, err := e.execMgr.Create(nil, "vendor", 0, ExecutionTriggerManual, + map[string]interface{}{"k": "v"}) + e.Require().Nil(err) + e.Equal(int64(1), id) + e.execDAO.AssertExpectations(e.T()) +} + +func (e *executionManagerTestSuite) TestMarkDone() { + e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + err := e.execMgr.MarkDone(nil, 1, "success") + e.Require().Nil(err) + e.execDAO.AssertExpectations(e.T()) +} + +func (e *executionManagerTestSuite) TestMarkError() { + e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + err := e.execMgr.MarkError(nil, 1, "error") + e.Require().Nil(err) + e.execDAO.AssertExpectations(e.T()) +} + +func (e *executionManagerTestSuite) TestStop() { + e.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{ + { + ID: 1, + ExecutionID: 1, + }, + }, nil) + e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) + err := e.execMgr.Stop(nil, 1) + e.Require().Nil(err) + e.taskDAO.AssertExpectations(e.T()) + e.taskMgr.AssertExpectations(e.T()) +} + +func (e *executionManagerTestSuite) TestDelete() { + // try to delete the execution which contains running tasks + e.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{ + { + ID: 1, + ExecutionID: 1, + Status: job.RunningStatus.String(), + }, + }, nil) + err := e.execMgr.Delete(nil, 1) + e.Require().NotNil(err) + e.True(errors.IsErr(err, errors.PreconditionCode)) + e.taskDAO.AssertExpectations(e.T()) + + // reset the mock + e.SetupTest() + + e.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{ + { + ID: 1, + ExecutionID: 1, + Status: job.SuccessStatus.String(), + }, + }, nil) + e.taskDAO.On("Delete", mock.Anything, mock.Anything).Return(nil) + e.execDAO.On("Delete", mock.Anything, mock.Anything).Return(nil) + err = e.execMgr.Delete(nil, 1) + e.Require().Nil(err) + e.taskDAO.AssertExpectations(e.T()) + e.execDAO.AssertExpectations(e.T()) +} + +func (e *executionManagerTestSuite) TestGet() { + e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{ + ID: 1, + Status: job.SuccessStatus.String(), + }, nil) + exec, err := e.execMgr.Get(nil, 1) + e.Require().Nil(err) + e.Equal(int64(1), exec.ID) + e.Equal(job.SuccessStatus.String(), exec.Status) + e.execDAO.AssertExpectations(e.T()) +} + +func (e *executionManagerTestSuite) TestList() { + e.execDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Execution{ + { + ID: 1, + Status: job.SuccessStatus.String(), + }, + }, nil) + execs, err := e.execMgr.List(nil, nil) + e.Require().Nil(err) + e.Require().Len(execs, 1) + e.Equal(int64(1), execs[0].ID) + e.Equal(job.SuccessStatus.String(), execs[0].Status) + e.execDAO.AssertExpectations(e.T()) +} + +func (e *executionManagerTestSuite) TestPopulateExecutionMetrics() { + e.taskDAO.On("ListStatusCount", mock.Anything, mock.Anything).Return([]*dao.StatusCount{ + { + Status: job.SuccessStatus.String(), + Count: 1, + }, + { + Status: job.ErrorStatus.String(), + Count: 1, + }, + { + Status: job.StoppedStatus.String(), + Count: 1, + }, + { + Status: job.RunningStatus.String(), + Count: 1, + }, + { + Status: job.PendingStatus.String(), + Count: 1, + }, + { + Status: job.ScheduledStatus.String(), + Count: 1, + }, + }, nil) + exec := &Execution{} + e.execMgr.populateExecutionMetrics(nil, exec) + e.Require().NotNil(exec.Metrics) + e.Equal(int64(6), exec.Metrics.TaskCount) + e.Equal(int64(1), exec.Metrics.SuccessTaskCount) + e.Equal(int64(1), exec.Metrics.ErrorTaskCount) + e.Equal(int64(1), exec.Metrics.StoppedTaskCount) + e.Equal(int64(1), exec.Metrics.PendingTaskCount) + e.Equal(int64(1), exec.Metrics.RunningTaskCount) + e.Equal(int64(1), exec.Metrics.ScheduledTaskCount) + e.taskDAO.AssertExpectations(e.T()) +} + +func (e *executionManagerTestSuite) TestPopulateExecutionStatus() { + // running + exec := &Execution{} + e.execMgr.populateExecutionStatus(exec) + e.Equal(job.RunningStatus.String(), exec.Status) + + // running + exec = &Execution{ + Metrics: &Metrics{ + SuccessTaskCount: 1, + ErrorTaskCount: 1, + PendingTaskCount: 1, + RunningTaskCount: 1, + ScheduledTaskCount: 1, + StoppedTaskCount: 1, + }, + } + e.execMgr.populateExecutionStatus(exec) + e.Equal(job.RunningStatus.String(), exec.Status) + + // error + exec = &Execution{ + Metrics: &Metrics{ + SuccessTaskCount: 1, + ErrorTaskCount: 1, + PendingTaskCount: 0, + RunningTaskCount: 0, + ScheduledTaskCount: 0, + StoppedTaskCount: 1, + }, + } + e.execMgr.populateExecutionStatus(exec) + e.Equal(job.ErrorStatus.String(), exec.Status) + + // stopped + exec = &Execution{ + Metrics: &Metrics{ + SuccessTaskCount: 1, + ErrorTaskCount: 0, + PendingTaskCount: 0, + RunningTaskCount: 0, + ScheduledTaskCount: 0, + StoppedTaskCount: 1, + }, + } + e.execMgr.populateExecutionStatus(exec) + e.Equal(job.StoppedStatus.String(), exec.Status) + + // success + exec = &Execution{ + Metrics: &Metrics{ + SuccessTaskCount: 1, + ErrorTaskCount: 0, + PendingTaskCount: 0, + RunningTaskCount: 0, + ScheduledTaskCount: 0, + StoppedTaskCount: 0, + }, + } + e.execMgr.populateExecutionStatus(exec) + e.Equal(job.SuccessStatus.String(), exec.Status) +} + +func (e *executionManagerTestSuite) TestPopulateExecutionEndTime() { + // isn't final status + exec := &Execution{ + Status: job.RunningStatus.String(), + } + e.execMgr.populateExecutionEndTime(nil, exec) + e.Equal(time.Time{}, exec.EndTime) + + // final status + now := time.Now() + exec = &Execution{ + Status: job.SuccessStatus.String(), + } + e.taskDAO.On("GetMaxEndTime", mock.Anything, mock.Anything).Return(now, nil) + e.execMgr.populateExecutionEndTime(nil, exec) + e.Equal(now, exec.EndTime) + e.taskDAO.AssertExpectations(e.T()) +} + +func TestExecutionManagerSuite(t *testing.T) { + suite.Run(t, &executionManagerTestSuite{}) +} diff --git a/src/pkg/task/hook.go b/src/pkg/task/hook.go new file mode 100644 index 000000000..c5aed5bbd --- /dev/null +++ b/src/pkg/task/hook.go @@ -0,0 +1,63 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "fmt" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/pkg/task/dao" +) + +// NewHookHandler creates a hook handler instance +func NewHookHandler() *HookHandler { + return &HookHandler{ + taskDAO: dao.NewTaskDAO(), + executionDAO: dao.NewExecutionDAO(), + } +} + +// HookHandler handles the job status changing webhook +type HookHandler struct { + taskDAO dao.TaskDAO + executionDAO dao.ExecutionDAO +} + +// Handle the job status changing webhook +func (h *HookHandler) Handle(ctx context.Context, taskID int64, sc *job.StatusChange) error { + // process check in data + if len(sc.CheckIn) > 0 { + task, err := h.taskDAO.Get(ctx, taskID) + if err != nil { + return err + } + execution, err := h.executionDAO.Get(ctx, task.ExecutionID) + if err != nil { + return err + } + + processor, exist := registry[execution.VendorType] + if !exist { + return fmt.Errorf("the check in processor for task %d not found", taskID) + } + t := &Task{} + t.From(task) + return processor(ctx, t, sc) + } + + // update status + return h.taskDAO.UpdateStatus(ctx, taskID, sc.Status, sc.Metadata.Revision) +} diff --git a/src/pkg/task/hook_test.go b/src/pkg/task/hook_test.go new file mode 100644 index 000000000..5a3dd97ca --- /dev/null +++ b/src/pkg/task/hook_test.go @@ -0,0 +1,81 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "testing" + "time" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/pkg/task/dao" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type hookHandlerTestSuite struct { + suite.Suite + handler *HookHandler + execDAO *mockExecutionDAO + taskDAO *mockTaskDAO +} + +func (h *hookHandlerTestSuite) SetupTest() { + h.execDAO = &mockExecutionDAO{} + h.taskDAO = &mockTaskDAO{} + h.handler = &HookHandler{ + taskDAO: h.taskDAO, + executionDAO: h.execDAO, + } +} + +func (h *hookHandlerTestSuite) TestHandle() { + // handle check in data + registry["test"] = func(ctx context.Context, task *Task, change *job.StatusChange) (err error) { return nil } + h.taskDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{ + ID: 1, + ExecutionID: 1, + }, nil) + h.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{ + ID: 1, + VendorType: "test", + }, nil) + sc := &job.StatusChange{ + CheckIn: "data", + } + err := h.handler.Handle(nil, 1, sc) + h.Require().Nil(err) + h.taskDAO.AssertExpectations(h.T()) + h.execDAO.AssertExpectations(h.T()) + + // reset mock + h.SetupTest() + + // handle status changing + h.taskDAO.On("UpdateStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + sc = &job.StatusChange{ + Status: job.SuccessStatus.String(), + Metadata: &job.StatsInfo{ + Revision: time.Now().Unix(), + }, + } + err = h.handler.Handle(nil, 1, sc) + h.Require().Nil(err) + h.taskDAO.AssertExpectations(h.T()) +} + +func TestHookHandlerTestSuite(t *testing.T) { + suite.Run(t, &hookHandlerTestSuite{}) +} diff --git a/src/pkg/task/mock.go b/src/pkg/task/mock.go new file mode 100644 index 000000000..1c1761aa4 --- /dev/null +++ b/src/pkg/task/mock.go @@ -0,0 +1,33 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +//go:generate mockery -dir ./dao -name TaskDAO -output . -outpkg task -filename mock_task_dao_test.go -structname mockTaskDAO +//go:generate mockery -dir ./dao -name ExecutionDAO -output . -outpkg task -filename mock_execution_dao_test.go -structname mockExecutionDAO +// Need to modify the generated mock code manually to avoid the compile error: https://github.com/vektra/mockery/issues/293 +/* +func (_m *mockTaskManager) Create(ctx context.Context, executionID int64, job *Job, extraAttrs ...map[string]interface{}) (int64, error) { + _va := make([]interface{}, len(extraAttrs)) + for _i := range extraAttrs { + _va[_i] = extraAttrs[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, executionID, job) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + ... +*/ +//go:generate mockery -name Manager -output . -outpkg task -filename mock_task_manager_test.go -structname mockTaskManager -inpkg +//go:generate mockery -dir ../../common/job -name Client -output . -outpkg task -filename mock_jobservice_client_test.go -structname mockJobserviceClient diff --git a/src/pkg/task/mock_execution_dao_test.go b/src/pkg/task/mock_execution_dao_test.go new file mode 100644 index 000000000..0de324e81 --- /dev/null +++ b/src/pkg/task/mock_execution_dao_test.go @@ -0,0 +1,140 @@ +// Code generated by mockery v1.1.2. DO NOT EDIT. + +package task + +import ( + context "context" + + dao "github.com/goharbor/harbor/src/pkg/task/dao" + mock "github.com/stretchr/testify/mock" + + q "github.com/goharbor/harbor/src/lib/q" +) + +// mockExecutionDAO is an autogenerated mock type for the ExecutionDAO type +type mockExecutionDAO struct { + mock.Mock +} + +// Count provides a mock function with given fields: ctx, query +func (_m *mockExecutionDAO) Count(ctx context.Context, query *q.Query) (int64, error) { + ret := _m.Called(ctx, query) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) int64); ok { + r0 = rf(ctx, query) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Create provides a mock function with given fields: ctx, execution +func (_m *mockExecutionDAO) Create(ctx context.Context, execution *dao.Execution) (int64, error) { + ret := _m.Called(ctx, execution) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, *dao.Execution) int64); ok { + r0 = rf(ctx, execution) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *dao.Execution) error); ok { + r1 = rf(ctx, execution) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Delete provides a mock function with given fields: ctx, id +func (_m *mockExecutionDAO) Delete(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Get provides a mock function with given fields: ctx, id +func (_m *mockExecutionDAO) Get(ctx context.Context, id int64) (*dao.Execution, error) { + ret := _m.Called(ctx, id) + + var r0 *dao.Execution + if rf, ok := ret.Get(0).(func(context.Context, int64) *dao.Execution); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*dao.Execution) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// List provides a mock function with given fields: ctx, query +func (_m *mockExecutionDAO) List(ctx context.Context, query *q.Query) ([]*dao.Execution, error) { + ret := _m.Called(ctx, query) + + var r0 []*dao.Execution + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) []*dao.Execution); ok { + r0 = rf(ctx, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*dao.Execution) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Update provides a mock function with given fields: ctx, execution, props +func (_m *mockExecutionDAO) Update(ctx context.Context, execution *dao.Execution, props ...string) error { + _va := make([]interface{}, len(props)) + for _i := range props { + _va[_i] = props[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, execution) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *dao.Execution, ...string) error); ok { + r0 = rf(ctx, execution, props...) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/src/pkg/task/mock_jobservice_client_test.go b/src/pkg/task/mock_jobservice_client_test.go new file mode 100644 index 000000000..419301290 --- /dev/null +++ b/src/pkg/task/mock_jobservice_client_test.go @@ -0,0 +1,96 @@ +// Code generated by mockery v1.1.2. DO NOT EDIT. + +package task + +import ( + job "github.com/goharbor/harbor/src/jobservice/job" + mock "github.com/stretchr/testify/mock" + + models "github.com/goharbor/harbor/src/common/job/models" +) + +// mockJobserviceClient is an autogenerated mock type for the Client type +type mockJobserviceClient struct { + mock.Mock +} + +// GetExecutions provides a mock function with given fields: uuid +func (_m *mockJobserviceClient) GetExecutions(uuid string) ([]job.Stats, error) { + ret := _m.Called(uuid) + + var r0 []job.Stats + if rf, ok := ret.Get(0).(func(string) []job.Stats); ok { + r0 = rf(uuid) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]job.Stats) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(uuid) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetJobLog provides a mock function with given fields: uuid +func (_m *mockJobserviceClient) GetJobLog(uuid string) ([]byte, error) { + ret := _m.Called(uuid) + + var r0 []byte + if rf, ok := ret.Get(0).(func(string) []byte); ok { + r0 = rf(uuid) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(uuid) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PostAction provides a mock function with given fields: uuid, action +func (_m *mockJobserviceClient) PostAction(uuid string, action string) error { + ret := _m.Called(uuid, action) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(uuid, action) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SubmitJob provides a mock function with given fields: _a0 +func (_m *mockJobserviceClient) SubmitJob(_a0 *models.JobData) (string, error) { + ret := _m.Called(_a0) + + var r0 string + if rf, ok := ret.Get(0).(func(*models.JobData) string); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(string) + } + + var r1 error + if rf, ok := ret.Get(1).(func(*models.JobData) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/src/pkg/task/mock_task_dao_test.go b/src/pkg/task/mock_task_dao_test.go new file mode 100644 index 000000000..56efa6030 --- /dev/null +++ b/src/pkg/task/mock_task_dao_test.go @@ -0,0 +1,200 @@ +// Code generated by mockery v1.1.2. DO NOT EDIT. + +package task + +import ( + context "context" + + dao "github.com/goharbor/harbor/src/pkg/task/dao" + mock "github.com/stretchr/testify/mock" + + q "github.com/goharbor/harbor/src/lib/q" + + time "time" +) + +// mockTaskDAO is an autogenerated mock type for the TaskDAO type +type mockTaskDAO struct { + mock.Mock +} + +// Count provides a mock function with given fields: ctx, query +func (_m *mockTaskDAO) Count(ctx context.Context, query *q.Query) (int64, error) { + ret := _m.Called(ctx, query) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) int64); ok { + r0 = rf(ctx, query) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Create provides a mock function with given fields: ctx, _a1 +func (_m *mockTaskDAO) Create(ctx context.Context, _a1 *dao.Task) (int64, error) { + ret := _m.Called(ctx, _a1) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, *dao.Task) int64); ok { + r0 = rf(ctx, _a1) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *dao.Task) error); ok { + r1 = rf(ctx, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Delete provides a mock function with given fields: ctx, id +func (_m *mockTaskDAO) Delete(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Get provides a mock function with given fields: ctx, id +func (_m *mockTaskDAO) Get(ctx context.Context, id int64) (*dao.Task, error) { + ret := _m.Called(ctx, id) + + var r0 *dao.Task + if rf, ok := ret.Get(0).(func(context.Context, int64) *dao.Task); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*dao.Task) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetMaxEndTime provides a mock function with given fields: ctx, executionID +func (_m *mockTaskDAO) GetMaxEndTime(ctx context.Context, executionID int64) (time.Time, error) { + ret := _m.Called(ctx, executionID) + + var r0 time.Time + if rf, ok := ret.Get(0).(func(context.Context, int64) time.Time); ok { + r0 = rf(ctx, executionID) + } else { + r0 = ret.Get(0).(time.Time) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, executionID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// List provides a mock function with given fields: ctx, query +func (_m *mockTaskDAO) List(ctx context.Context, query *q.Query) ([]*dao.Task, error) { + ret := _m.Called(ctx, query) + + var r0 []*dao.Task + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) []*dao.Task); ok { + r0 = rf(ctx, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*dao.Task) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ListStatusCount provides a mock function with given fields: ctx, executionID +func (_m *mockTaskDAO) ListStatusCount(ctx context.Context, executionID int64) ([]*dao.StatusCount, error) { + ret := _m.Called(ctx, executionID) + + var r0 []*dao.StatusCount + if rf, ok := ret.Get(0).(func(context.Context, int64) []*dao.StatusCount); ok { + r0 = rf(ctx, executionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*dao.StatusCount) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, executionID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Update provides a mock function with given fields: ctx, _a1, props +func (_m *mockTaskDAO) Update(ctx context.Context, _a1 *dao.Task, props ...string) error { + _va := make([]interface{}, len(props)) + for _i := range props { + _va[_i] = props[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, _a1) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *dao.Task, ...string) error); ok { + r0 = rf(ctx, _a1, props...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateStatus provides a mock function with given fields: ctx, id, status, statusRevision +func (_m *mockTaskDAO) UpdateStatus(ctx context.Context, id int64, status string, statusRevision int64) error { + ret := _m.Called(ctx, id, status, statusRevision) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, string, int64) error); ok { + r0 = rf(ctx, id, status, statusRevision) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/src/pkg/task/mock_task_manager_test.go b/src/pkg/task/mock_task_manager_test.go new file mode 100644 index 000000000..c56282117 --- /dev/null +++ b/src/pkg/task/mock_task_manager_test.go @@ -0,0 +1,126 @@ +// Code generated by mockery v1.1.2. DO NOT EDIT. + +package task + +import ( + context "context" + + q "github.com/goharbor/harbor/src/lib/q" + mock "github.com/stretchr/testify/mock" +) + +// mockTaskManager is an autogenerated mock type for the Manager type +type mockTaskManager struct { + mock.Mock +} + +// Create provides a mock function with given fields: ctx, executionID, job, extraAttrs +func (_m *mockTaskManager) Create(ctx context.Context, executionID int64, job *Job, extraAttrs ...map[string]interface{}) (int64, error) { + _va := make([]interface{}, len(extraAttrs)) + for _i := range extraAttrs { + _va[_i] = extraAttrs[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, executionID, job) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, int64, *Job, ...map[string]interface{}) int64); ok { + r0 = rf(ctx, executionID, job, extraAttrs...) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64, *Job, ...map[string]interface{}) error); ok { + r1 = rf(ctx, executionID, job, extraAttrs...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Get provides a mock function with given fields: ctx, id +func (_m *mockTaskManager) Get(ctx context.Context, id int64) (*Task, error) { + ret := _m.Called(ctx, id) + + var r0 *Task + if rf, ok := ret.Get(0).(func(context.Context, int64) *Task); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*Task) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetLog provides a mock function with given fields: ctx, id +func (_m *mockTaskManager) GetLog(ctx context.Context, id int64) ([]byte, error) { + ret := _m.Called(ctx, id) + + var r0 []byte + if rf, ok := ret.Get(0).(func(context.Context, int64) []byte); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// List provides a mock function with given fields: ctx, query +func (_m *mockTaskManager) List(ctx context.Context, query *q.Query) ([]*Task, error) { + ret := _m.Called(ctx, query) + + var r0 []*Task + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) []*Task); ok { + r0 = rf(ctx, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*Task) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Stop provides a mock function with given fields: ctx, id +func (_m *mockTaskManager) Stop(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/src/pkg/task/model.go b/src/pkg/task/model.go index 089ad40c9..08f884796 100644 --- a/src/pkg/task/model.go +++ b/src/pkg/task/model.go @@ -15,9 +15,12 @@ package task import ( + "encoding/json" "time" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/pkg/task/dao" ) // const definitions @@ -87,6 +90,27 @@ type Task struct { EndTime time.Time `json:"end_time"` } +// From constructs a task from DAO model +func (t *Task) From(task *dao.Task) { + t.ID = task.ID + t.ExecutionID = task.ExecutionID + t.Status = task.Status + t.StatusMessage = task.StatusMessage + t.RunCount = task.RunCount + t.CreationTime = task.CreationTime + t.StartTime = task.StartTime + t.UpdateTime = task.UpdateTime + t.EndTime = task.EndTime + if len(task.ExtraAttrs) > 0 { + extras := map[string]interface{}{} + if err := json.Unmarshal([]byte(task.ExtraAttrs), &extras); err != nil { + log.Errorf("failed to unmarshal the extra attributes of task %d: %v", task.ID, err) + return + } + t.ExtraAttrs = extras + } +} + // Job is the model represents the requested jobservice job type Job struct { Name string diff --git a/src/pkg/task/task.go b/src/pkg/task/task.go index deee43c4d..a02571dae 100644 --- a/src/pkg/task/task.go +++ b/src/pkg/task/task.go @@ -16,8 +16,22 @@ package task import ( "context" + "encoding/json" + "fmt" + "time" + cjob "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/task/dao" +) + +var ( + // Mgr is a global task manager instance + Mgr = NewManager() ) // Manager manages tasks. @@ -37,3 +51,167 @@ type Manager interface { // Get the log of the specified task GetLog(ctx context.Context, id int64) (log []byte, err error) } + +// NewManager creates an instance of the default task manager +func NewManager() Manager { + return &manager{ + dao: dao.NewTaskDAO(), + jsClient: cjob.GlobalClient, + coreURL: config.GetCoreURL(), + } +} + +type manager struct { + dao dao.TaskDAO + jsClient cjob.Client + coreURL string +} + +func (m *manager) Create(ctx context.Context, executionID int64, jb *Job, extraAttrs ...map[string]interface{}) (int64, error) { + // create task record in database + id, err := m.createTaskRecord(ctx, executionID, extraAttrs...) + if err != nil { + return 0, err + } + log.Debugf("the database record for task %d created", id) + + // submit job to jobservice + jobID, err := m.submitJob(ctx, id, jb) + if err != nil { + // failed to submit job to jobservice, update the status of task to error + log.Errorf("failed to submit job to jobservice: %v", err) + now := time.Now() + err = m.dao.Update(ctx, &dao.Task{ + ID: id, + Status: job.ErrorStatus.String(), + StatusCode: job.ErrorStatus.Code(), + StatusMessage: err.Error(), + UpdateTime: now, + EndTime: now, + }, "Status", "StatusCode", "StatusMessage", "UpdateTime", "EndTime") + if err != nil { + log.Errorf("failed to update task %d: %v", id, err) + } + return id, nil + } + + log.Debugf("the task %d is submitted to jobservice, the job ID is %s", id, jobID) + + // populate the job ID for the task + if err = m.dao.Update(ctx, &dao.Task{ + ID: id, + JobID: jobID, + }, "JobID"); err != nil { + log.Errorf("failed to populate the job ID for the task %d: %v", id, err) + } + + return id, nil +} + +func (m *manager) createTaskRecord(ctx context.Context, executionID int64, extraAttrs ...map[string]interface{}) (int64, error) { + extras := map[string]interface{}{} + if len(extraAttrs) > 0 && extraAttrs[0] != nil { + extras = extraAttrs[0] + } + data, err := json.Marshal(extras) + if err != nil { + return 0, err + } + + now := time.Now() + return m.dao.Create(ctx, &dao.Task{ + ExecutionID: executionID, + Status: job.PendingStatus.String(), + StatusCode: job.PendingStatus.Code(), + ExtraAttrs: string(data), + CreationTime: now, + UpdateTime: now, + }) +} + +func (m *manager) submitJob(ctx context.Context, id int64, jb *Job) (string, error) { + jobData := &models.JobData{ + Name: jb.Name, + StatusHook: fmt.Sprintf("%s/service/notifications/tasks/%d", m.coreURL, id), + } + if jb.Parameters != nil { + jobData.Parameters = models.Parameters(jb.Parameters) + } + if jb.Metadata != nil { + jobData.Metadata = &models.JobMetadata{ + JobKind: jb.Metadata.JobKind, + ScheduleDelay: jb.Metadata.ScheduleDelay, + Cron: jb.Metadata.Cron, + IsUnique: jb.Metadata.IsUnique, + } + } + + return m.jsClient.SubmitJob(jobData) +} + +func (m *manager) Stop(ctx context.Context, id int64) error { + task, err := m.dao.Get(ctx, id) + if err != nil { + return err + } + + // if the task is already in final status, return directly + if job.Status(task.Status).Final() { + log.Debugf("the task %d is in final status %s, skip", task.ID, task.Status) + return nil + } + + if err = m.jsClient.PostAction(task.JobID, string(job.StopCommand)); err != nil { + // job not found, update it's status to stop directly + if err == cjob.ErrJobNotFound { + now := time.Now() + err = m.dao.Update(ctx, &dao.Task{ + ID: task.ID, + Status: job.StoppedStatus.String(), + StatusCode: job.StoppedStatus.Code(), + UpdateTime: now, + EndTime: now, + }, "Status", "StatusCode", "UpdateTime", "EndTime") + if err != nil { + return err + } + log.Debugf("got job not found error for task %d, update it's status to stop directly", task.ID) + return nil + } + return err + } + log.Debugf("the stop request for task %d is sent", id) + return nil +} + +func (m *manager) Get(ctx context.Context, id int64) (*Task, error) { + task, err := m.dao.Get(ctx, id) + if err != nil { + return nil, err + } + t := &Task{} + t.From(task) + return t, nil +} + +func (m *manager) List(ctx context.Context, query *q.Query) ([]*Task, error) { + tasks, err := m.dao.List(ctx, query) + if err != nil { + return nil, err + } + var ts []*Task + for _, task := range tasks { + t := &Task{} + t.From(task) + ts = append(ts, t) + } + return ts, nil +} + +func (m *manager) GetLog(ctx context.Context, id int64) ([]byte, error) { + task, err := m.dao.Get(ctx, id) + if err != nil { + return nil, err + } + return m.jsClient.GetJobLog(task.JobID) +} diff --git a/src/pkg/task/task_test.go b/src/pkg/task/task_test.go new file mode 100644 index 000000000..246b62155 --- /dev/null +++ b/src/pkg/task/task_test.go @@ -0,0 +1,142 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "errors" + cjob "github.com/goharbor/harbor/src/common/job" + "testing" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/pkg/task/dao" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type taskManagerTestSuite struct { + suite.Suite + mgr *manager + dao *mockTaskDAO + jsClient *mockJobserviceClient +} + +func (t *taskManagerTestSuite) SetupTest() { + t.dao = &mockTaskDAO{} + t.jsClient = &mockJobserviceClient{} + t.mgr = &manager{ + dao: t.dao, + jsClient: t.jsClient, + } +} + +func (t *taskManagerTestSuite) TestCreate() { + // success to submit job to jobservice + t.dao.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) + t.jsClient.On("SubmitJob", mock.Anything).Return("1", nil) + t.dao.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + id, err := t.mgr.Create(nil, 1, &Job{}, map[string]interface{}{"a": "b"}) + t.Require().Nil(err) + t.Equal(int64(1), id) + t.dao.AssertExpectations(t.T()) + t.jsClient.AssertExpectations(t.T()) + + // reset mock + t.SetupTest() + + // failed to submit job to jobservice + t.dao.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) + t.jsClient.On("SubmitJob", mock.Anything).Return("", errors.New("error")) + t.dao.On("Update", mock.Anything, mock.Anything, mock.Anything, + mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + id, err = t.mgr.Create(nil, 1, &Job{}, map[string]interface{}{"a": "b"}) + t.Require().Nil(err) + t.Equal(int64(1), id) + t.dao.AssertExpectations(t.T()) + t.jsClient.AssertExpectations(t.T()) +} + +func (t *taskManagerTestSuite) TestStop() { + // the task is in final status + t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{ + ID: 1, + ExecutionID: 1, + Status: job.SuccessStatus.String(), + }, nil) + + err := t.mgr.Stop(nil, 1) + t.Require().Nil(err) + t.dao.AssertExpectations(t.T()) + + // reset mock + t.SetupTest() + + // the task isn't in final status, job not found + t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{ + ID: 1, + ExecutionID: 1, + Status: job.RunningStatus.String(), + }, nil) + t.jsClient.On("PostAction", mock.Anything, mock.Anything).Return(cjob.ErrJobNotFound) + t.dao.On("Update", mock.Anything, mock.Anything, + mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + err = t.mgr.Stop(nil, 1) + t.Require().Nil(err) + t.dao.AssertExpectations(t.T()) + t.jsClient.AssertExpectations(t.T()) + + // reset mock + t.SetupTest() + + // the task isn't in final status + t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{ + ID: 1, + ExecutionID: 1, + Status: job.RunningStatus.String(), + }, nil) + t.jsClient.On("PostAction", mock.Anything, mock.Anything).Return(nil) + err = t.mgr.Stop(nil, 1) + t.Require().Nil(err) + t.dao.AssertExpectations(t.T()) + t.jsClient.AssertExpectations(t.T()) +} + +func (t *taskManagerTestSuite) TestGet() { + t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{ + ID: 1, + }, nil) + task, err := t.mgr.Get(nil, 1) + t.Require().Nil(err) + t.Equal(int64(1), task.ID) + t.dao.AssertExpectations(t.T()) +} + +func (t *taskManagerTestSuite) TestList() { + t.dao.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{ + { + ID: 1, + }, + }, nil) + tasks, err := t.mgr.List(nil, nil) + t.Require().Nil(err) + t.Require().Len(tasks, 1) + t.Equal(int64(1), tasks[0].ID) + t.dao.AssertExpectations(t.T()) +} + +func TestTaskManagerTestSuite(t *testing.T) { + suite.Run(t, &taskManagerTestSuite{}) +} diff --git a/src/replication/operation/controller.go b/src/replication/operation/controller.go index d630d59e7..06a29226d 100644 --- a/src/replication/operation/controller.go +++ b/src/replication/operation/controller.go @@ -16,7 +16,6 @@ package operation import ( "fmt" - "strings" "time" "github.com/goharbor/harbor/src/common/job" @@ -47,10 +46,6 @@ const ( maxReplicators = 1024 ) -var ( - jobNotFoundErrorMsg = "object is not found" -) - // NewController returns a controller implementation func NewController(js job.Client) Controller { ctl := &controller{ @@ -180,7 +175,7 @@ func (c *controller) StopReplication(executionID int64) error { } continue } - if isJobNotFoundError(err) { + if err == job.ErrJobNotFound { e := c.executionMgr.UpdateTask(&models.Task{ ID: task.ID, Status: models.ExecutionStatusStopped, @@ -213,13 +208,6 @@ func isTaskInFinalStatus(task *models.Task) bool { return false } -func isJobNotFoundError(err error) bool { - if err == nil { - return false - } - return strings.Contains(err.Error(), jobNotFoundErrorMsg) -} - func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) { return c.executionMgr.List(query...) } diff --git a/src/server/handler/job_status_hook.go b/src/server/handler/job_status_hook.go new file mode 100644 index 000000000..0161446a3 --- /dev/null +++ b/src/server/handler/job_status_hook.go @@ -0,0 +1,58 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "encoding/json" + "net/http" + "strconv" + + "github.com/goharbor/harbor/src/jobservice/job" + libhttp "github.com/goharbor/harbor/src/lib/http" + "github.com/goharbor/harbor/src/pkg/task" + "github.com/goharbor/harbor/src/server/router" +) + +// NewJobStatusHandler creates a handler to handle the job status changing +func NewJobStatusHandler() http.Handler { + return &jobStatusHandler{ + handler: task.NewHookHandler(), + } +} + +type jobStatusHandler struct { + handler *task.HookHandler +} + +func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + taskIDParam := router.Param(r.Context(), ":id") + taskID, err := strconv.ParseInt(taskIDParam, 10, 64) + if err != nil { + libhttp.SendError(w, err) + return + } + + sc := &job.StatusChange{} + if err = json.NewDecoder(r.Body).Decode(sc); err != nil { + libhttp.SendError(w, err) + return + } + + if err = j.handler.Handle(r.Context(), taskID, sc); err != nil { + libhttp.SendError(w, err) + return + } +} diff --git a/src/server/route.go b/src/server/route.go index 11ee6c357..a89eae75e 100644 --- a/src/server/route.go +++ b/src/server/route.go @@ -24,6 +24,7 @@ import ( "github.com/goharbor/harbor/src/core/service/notifications/jobs" "github.com/goharbor/harbor/src/core/service/notifications/scheduler" "github.com/goharbor/harbor/src/core/service/token" + "github.com/goharbor/harbor/src/server/handler" "github.com/goharbor/harbor/src/server/router" "net/http" ) @@ -52,6 +53,7 @@ func registerRoutes() { beego.Router("/service/notifications/jobs/retention/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleRetentionTask") beego.Router("/service/notifications/schedules/:id([0-9]+)", &scheduler.Handler{}, "post:Handle") beego.Router("/service/notifications/jobs/scan/:uuid", &jobs.Handler{}, "post:HandleScan") + router.NewRoute().Method(http.MethodPost).Path("/service/notifications/tasks/:id").Handler(handler.NewJobStatusHandler()) beego.Router("/service/token", &token.Handler{})