From 127988b70c68442782cfcf8939bd2259bce492f6 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Mon, 8 Jun 2020 11:35:18 +0800 Subject: [PATCH] Define the task manager interface and data model Define the task manager interface and data model Signed-off-by: Wenkai Yin --- .../postgresql/0040_2.1.0_schema.up.sql | 30 ++++++ src/pkg/task/checkin.go | 37 ++++++++ src/pkg/task/checkin_test.go | 30 ++++++ src/pkg/task/dao/model.go | 59 ++++++++++++ src/pkg/task/execution.go | 45 +++++++++ src/pkg/task/model.go | 95 +++++++++++++++++++ src/pkg/task/task.go | 39 ++++++++ 7 files changed, 335 insertions(+) create mode 100644 src/pkg/task/checkin.go create mode 100644 src/pkg/task/checkin_test.go create mode 100644 src/pkg/task/dao/model.go create mode 100644 src/pkg/task/execution.go create mode 100644 src/pkg/task/model.go create mode 100644 src/pkg/task/task.go diff --git a/make/migrations/postgresql/0040_2.1.0_schema.up.sql b/make/migrations/postgresql/0040_2.1.0_schema.up.sql index 78977a40f..667b9db78 100644 --- a/make/migrations/postgresql/0040_2.1.0_schema.up.sql +++ b/make/migrations/postgresql/0040_2.1.0_schema.up.sql @@ -1 +1,31 @@ ALTER TABLE project ADD COLUMN IF NOT EXISTS registry_id int; + +CREATE TABLE IF NOT EXISTS execution ( + id SERIAL NOT NULL, + vendor_type varchar(16) NOT NULL, + vendor_id int, + status varchar(16), + status_message text, + trigger varchar(16) NOT NULL, + extra_attrs JSON, + start_time timestamp DEFAULT CURRENT_TIMESTAMP, + end_time timestamp, + PRIMARY KEY (id) +); + +CREATE TABLE IF NOT EXISTS task ( + id SERIAL PRIMARY KEY NOT NULL, + execution_id int NOT NULL, + job_id varchar(64), + status varchar(16) NOT NULL, + status_code int NOT NULL, + status_revision int, + status_message text, + run_count int, + extra_attrs JSON, + creation_time timestamp DEFAULT CURRENT_TIMESTAMP, + start_time timestamp, + update_time timestamp, + end_time timestamp, + FOREIGN KEY (execution_id) REFERENCES execution(id) +); diff --git a/src/pkg/task/checkin.go b/src/pkg/task/checkin.go new file mode 100644 index 000000000..01788dd65 --- /dev/null +++ b/src/pkg/task/checkin.go @@ -0,0 +1,37 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "fmt" + + "github.com/goharbor/harbor/src/jobservice/job" +) + +var ( + registry = map[string]CheckInProcessor{} +) + +// CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook +type CheckInProcessor func(task *Task, change *job.StatusChange) (err error) + +// Register check in processor for the specific vendor type +func Register(vendorType string, processor CheckInProcessor) error { + if _, exist := registry[vendorType]; exist { + return fmt.Errorf("check in processor for %s already exists", vendorType) + } + registry[vendorType] = processor + return nil +} diff --git a/src/pkg/task/checkin_test.go b/src/pkg/task/checkin_test.go new file mode 100644 index 000000000..279d08557 --- /dev/null +++ b/src/pkg/task/checkin_test.go @@ -0,0 +1,30 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRegister(t *testing.T) { + err := Register("test", nil) + assert.Nil(t, err) + + // already exist + err = Register("test", nil) + assert.NotNil(t, err) +} diff --git a/src/pkg/task/dao/model.go b/src/pkg/task/dao/model.go new file mode 100644 index 000000000..78bea57e8 --- /dev/null +++ b/src/pkg/task/dao/model.go @@ -0,0 +1,59 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "time" + + "github.com/astaxie/beego/orm" +) + +func init() { + orm.RegisterModel(&Execution{}) + orm.RegisterModel(&Task{}) +} + +// Execution database model +type Execution struct { + ID int64 `orm:"pk;auto;column(id)"` + VendorType string `orm:"column(vendor_type)"` + VendorID int64 `orm:"column(vendor_id)"` + // In most of cases, the status should be calculated from the referenced tasks. + // When the execution contains no task or failed to create tasks, the status should + // be set manually + Status string `orm:"column(status)"` + StatusMessage string `orm:"column(status_message)"` + Trigger string `orm:"column(trigger)"` + ExtraAttrs string `orm:"column(extra_attrs)"` // json string + StartTime time.Time `orm:"column(start_time)"` + EndTime time.Time `orm:"column(end_time)"` +} + +// Task database model +type Task struct { + ID int64 `orm:"pk;auto;column(id)"` + ExecutionID int64 `orm:"column(execution_id)"` + JobID int64 `orm:"column(job_id)"` + Status string `orm:"column(status)"` + StatusCode int `orm:"column(status_code)"` + StatusRevision int64 `orm:"column(status_revision)"` + StatusMessage string `orm:"column(status_message)"` + RunCount int `orm:"column(run_count)"` + ExtraAttrs string `orm:"column(extra_attrs)"` // json string + CreationTime time.Time `orm:"column(creation_time)"` + StartTime time.Time `orm:"column(start_time)"` + UpdateTime time.Time `orm:"column(update_time)"` + EndTime time.Time `orm:"column(end_time)"` +} diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go new file mode 100644 index 000000000..be79d4da6 --- /dev/null +++ b/src/pkg/task/execution.go @@ -0,0 +1,45 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "time" + + "github.com/goharbor/harbor/src/lib/q" +) + +// ExecutionManager manages executions. +// The execution and task managers provide an execution-task model to abstract the interactive with jobservice. +// All of the operations with jobservice should be delegated by them +type ExecutionManager interface { + // Create an execution. The "vendorType" specifies the type of vendor (replication, scan, gc, retention, etc.), + // and the "vendorID" specifies the ID of vendor if needed(policy ID for replication and retention). The + // "extraAttrs" can be used to set the customized attributes + Create(ctx context.Context, vendorType string, vendorID int64, trigger string, + extraAttrs ...map[string]interface{}) (id int64, err error) + // UpdateStatus updates the status of the execution. + // In most cases, the execution status can be calculated from the referenced tasks automatically. + // When the execution contains no tasks or failed to create tasks, the status should be set manually + UpdateStatus(ctx context.Context, id int64, status, message string, endTime time.Time) (err error) + // Stop all linked tasks of the specified execution + Stop(ctx context.Context, id int64) (err error) + // Delete the specified execution and its tasks + Delete(ctx context.Context, id int64) (err error) + // Get the specified execution + Get(ctx context.Context, id int64) (execution *Execution, err error) + // List executions according to the query + List(ctx context.Context, query *q.Query) (executions []*Execution, err error) +} diff --git a/src/pkg/task/model.go b/src/pkg/task/model.go new file mode 100644 index 000000000..089ad40c9 --- /dev/null +++ b/src/pkg/task/model.go @@ -0,0 +1,95 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "time" + + "github.com/goharbor/harbor/src/jobservice/job" +) + +// const definitions +const ( + ExecutionVendorTypeReplication = "REPLICATION" + ExecutionVendorTypeGarbageCollection = "GARBAGE_COLLECTION" + ExecutionVendorTypeRetention = "RETENTION" + ExecutionVendorTypeScan = "SCAN" + ExecutionVendorTypeScanAll = "SCAN_ALL" + ExecutionVendorTypeScheduler = "SCHEDULER" + + ExecutionTriggerManual = "MANUAL" + ExecutionTriggerSchedule = "SCHEDULE" + ExecutionTriggerEvent = "EVENT" +) + +// Execution is one run for one action. It contains one or more tasks and provides the summary view of the tasks +type Execution struct { + ID int64 `json:"id"` + // indicate the execution type: replication/GC/retention/scan/etc. + VendorType string `json:"vendor_type"` + // the ID of vendor policy/rule/etc. e.g. replication policy ID + VendorID int64 `json:"vendor_id"` + Status string `json:"status"` + // the detail message to explain the status in some cases. e.g. + // 1. After creating the execution, there may be some errors before creating tasks, the + // "StatusMessage" can contain the error message + // 2. The execution may contain no tasks, "StatusMessage" can be used to explain the case + StatusMessage string `json:"status_message"` + Metrics *Metrics `json:"metrics"` + // trigger type: manual/schedule/event + Trigger string `json:"trigger"` + // the customized attributes for different kinds of consumers + ExtraAttrs map[string]interface{} `json:"extra_attrs"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` +} + +// Metrics for tasks +type Metrics struct { + TaskCount int64 `json:"task_count"` + SuccessTaskCount int64 `json:"success_task_count"` + ErrorTaskCount int64 `json:"error_task_count"` + PendingTaskCount int64 `json:"pending_task_count"` + RunningTaskCount int64 `json:"running_task_count"` + ScheduledTaskCount int64 `json:"scheduled_task_count"` + StoppedTaskCount int64 `json:"stopped_task_count"` +} + +// Task is the unit for running. It stores the jobservice job records and related information +type Task struct { + ID int64 `json:"id"` + ExecutionID int64 `json:"execution_id"` + Status string `json:"status"` + // the detail message to explain the status in some cases. e.g. + // When the job is failed to submit to jobservice, this field can be used to explain the reason + StatusMessage string `json:"status_message"` + // the underlying job may retry several times + RunCount int `json:"run_count"` + // the customized attributes for different kinds of consumers + ExtraAttrs map[string]interface{} `json:"extra_attrs"` + // the time that the task record created + CreationTime time.Time `json:"creation_time"` + // the time that the underlying job starts + StartTime time.Time `json:"start_time"` + UpdateTime time.Time `json:"update_time"` + EndTime time.Time `json:"end_time"` +} + +// Job is the model represents the requested jobservice job +type Job struct { + Name string + Parameters job.Parameters + Metadata *job.Metadata +} diff --git a/src/pkg/task/task.go b/src/pkg/task/task.go new file mode 100644 index 000000000..deee43c4d --- /dev/null +++ b/src/pkg/task/task.go @@ -0,0 +1,39 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + + "github.com/goharbor/harbor/src/lib/q" +) + +// Manager manages tasks. +// The execution and task managers provide an execution-task model to abstract the interactive with jobservice. +// All of the operations with jobservice should be delegated by them +type Manager interface { + // Create submits the job to jobservice and creates a corresponding task record. + // An execution must be created first and the task will be linked to it. + // The "extraAttrs" can be used to set the customized attributes + Create(ctx context.Context, executionID int64, job *Job, extraAttrs ...map[string]interface{}) (id int64, err error) + // Stop the specified task + Stop(ctx context.Context, id int64) (err error) + // Get the specified task + Get(ctx context.Context, id int64) (task *Task, err error) + // List the tasks according to the query + List(ctx context.Context, query *q.Query) (tasks []*Task, err error) + // Get the log of the specified task + GetLog(ctx context.Context, id int64) (log []byte, err error) +}