Merge pull request #7272 from ywk253100/190402_trigger_operation

Fix replication related issues
This commit is contained in:
Wenkai Yin 2019-04-02 16:12:16 +08:00 committed by GitHub
commit 68803313d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 56 additions and 26 deletions

View File

@ -85,6 +85,7 @@ create table replication_task (
resource_type varchar(64),
src_resource varchar(256),
dst_resource varchar(256),
operation varchar(32),
job_id varchar(64),
status varchar(32),
start_time timestamp default CURRENT_TIMESTAMP,

View File

@ -5,6 +5,8 @@ import (
"net/http"
"strconv"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/api/models"
@ -215,16 +217,27 @@ func (t *RegistryAPI) Delete() {
// GetInfo returns the base info and capability declarations of the registry
func (t *RegistryAPI) GetInfo() {
id := t.GetIDFromURL()
registry, err := t.manager.Get(id)
if err != nil {
t.HandleInternalServerError(fmt.Sprintf("failed to get registry %d: %v", id, err))
id, err := t.GetInt64FromPath(":id")
// "0" is used for the ID of the local Harbor registry
if err != nil || id < 0 {
t.HandleBadRequest(fmt.Sprintf("invalid registry ID %s", t.GetString(":id")))
return
}
if registry == nil {
t.HandleNotFound(fmt.Sprintf("registry %d not found", id))
return
var registry *model.Registry
if id == 0 {
registry = event.GetLocalRegistry()
} else {
registry, err = t.manager.Get(id)
if err != nil {
t.HandleInternalServerError(fmt.Sprintf("failed to get registry %d: %v", id, err))
return
}
if registry == nil {
t.HandleNotFound(fmt.Sprintf("registry %d not found", id))
return
}
}
factory, err := adapter.GetFactory(registry.Type)
if err != nil {
t.HandleInternalServerError(fmt.Sprintf("failed to get the adapter factory for registry type %s: %v", registry.Type, err))

View File

@ -19,6 +19,8 @@ import (
"net/http"
"strconv"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng"
@ -120,7 +122,8 @@ func (r *ReplicationOperationAPI) CreateExecution() {
return
}
executionID, err := ng.OperationCtl.StartReplication(policy, nil)
trigger := r.GetString("trigger", model.TriggerTypeManual)
executionID, err := ng.OperationCtl.StartReplication(policy, nil, trigger)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err))
return

View File

@ -25,7 +25,7 @@ import (
type fakedOperationController struct{}
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) {
return 1, nil
}
func (f *fakedOperationController) StopReplication(int64) error {

View File

@ -135,7 +135,8 @@ func initRouters() {
beego.Router("/api/registries", &api.RegistryAPI{}, "get:List;post:Post")
beego.Router("/api/registries/:id([0-9]+)", &api.RegistryAPI{}, "get:Get;put:Put;delete:Delete")
beego.Router("/api/registries/:id([0-9]+)/info", &api.RegistryAPI{}, "get:GetInfo")
// we use "0" as the ID of the local Harbor registry, so don't add "([0-9]+)" in the path
beego.Router("/api/registries/:id/info", &api.RegistryAPI{}, "get:GetInfo")
beego.Router("/v2/*", &controllers.RegistryProxy{}, "*:Handle")

View File

@ -106,13 +106,13 @@ type TaskFieldsName struct {
}
// Task represent the tasks in one execution.
// TODO add operation property
type Task struct {
ID int64 `orm:"pk;auto;column(id)" json:"id"`
ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"`
ResourceType string `orm:"column(resource_type)" json:"resource_type"`
SrcResource string `orm:"column(src_resource)" json:"src_resource"`
DstResource string `orm:"column(dst_resource)" json:"dst_resource"`
Operation string `orm:"column(operation)" json:"operation"`
JobID string `orm:"column(job_id)" json:"job_id"`
Status string `orm:"column(status)" json:"status"`
StartTime time.Time `orm:"column(start_time)" json:"start_time"`

View File

@ -76,7 +76,7 @@ func (h *handler) Handle(event *Event) error {
if err := PopulateRegistries(h.registryMgr, policy); err != nil {
return err
}
id, err := h.opCtl.StartReplication(policy, event.Resource)
id, err := h.opCtl.StartReplication(policy, event.Resource, model.TriggerTypeEventBased)
if err != nil {
return err
}
@ -140,7 +140,7 @@ func PopulateRegistries(registryMgr registry.Manager, policy *model.Policy) erro
func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model.Registry, error) {
if registry == nil || registry.ID == 0 {
return getLocalRegistry(), nil
return GetLocalRegistry(), nil
}
reg, err := registryMgr.Get(registry.ID)
if err != nil {
@ -152,7 +152,8 @@ func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model
return reg, nil
}
func getLocalRegistry() *model.Registry {
// GetLocalRegistry returns the info of the local Harbor registry
func GetLocalRegistry() *model.Registry {
return &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",

View File

@ -28,7 +28,7 @@ import (
type fakedOperationController struct{}
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) {
return 1, nil
}
func (f *fakedOperationController) StopReplication(int64) error {

View File

@ -31,7 +31,8 @@ import (
// Controller handles the replication-related operations: start,
// stop, query, etc.
type Controller interface {
StartReplication(policy *model.Policy, resource *model.Resource) (int64, error)
// trigger is used to specified that what this replication is triggered by
StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error)
StopReplication(int64) error
ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error)
GetExecution(int64) (*models.Execution, error)
@ -56,12 +57,14 @@ type controller struct {
scheduler scheduler.Scheduler
}
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) {
if resource != nil && len(resource.Metadata.Vtags) != 1 {
return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags)
}
id, err := createExecution(c.executionMgr, policy.ID)
if len(trigger) == 0 {
trigger = model.TriggerTypeManual
}
id, err := createExecution(c.executionMgr, policy.ID, trigger)
if err != nil {
return 0, err
}
@ -139,9 +142,10 @@ func (c *controller) GetTaskLog(taskID int64) ([]byte, error) {
}
// create the execution record in database
func createExecution(mgr execution.Manager, policyID int64) (int64, error) {
func createExecution(mgr execution.Manager, policyID int64, trigger string) (int64, error) {
id, err := mgr.Create(&models.Execution{
PolicyID: policyID,
Trigger: trigger,
Status: models.ExecutionStatusInProgress,
StartTime: time.Now(),
})

View File

@ -232,24 +232,24 @@ func TestStartReplication(t *testing.T) {
Vtags: []string{"1.0", "2.0"},
},
}
_, err = ctl.StartReplication(policy, resource)
_, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.NotNil(t, err)
// replicate resource deletion
resource.Metadata.Vtags = []string{"1.0"}
resource.Deleted = true
id, err := ctl.StartReplication(policy, resource)
id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
// replicate resource copy
resource.Deleted = false
id, err = ctl.StartReplication(policy, resource)
id, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
// nil resource
id, err = ctl.StartReplication(policy, nil)
id, err = ctl.StartReplication(policy, nil, model.TriggerTypeEventBased)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
}

View File

@ -263,12 +263,17 @@ func preprocess(scheduler scheduler.Scheduler, srcResources, dstResources []*mod
// create task records in database
func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.ScheduleItem) error {
for _, item := range items {
operation := "copy"
if item.DstResource.Deleted {
operation = "deletion"
}
task := &models.Task{
ExecutionID: executionID,
Status: models.TaskStatusInitialized,
ResourceType: string(item.SrcResource.Type),
SrcResource: getResourceName(item.SrcResource),
DstResource: getResourceName(item.DstResource),
Operation: operation,
}
id, err := mgr.CreateTask(task)
if err != nil {

View File

@ -28,7 +28,7 @@ type fakedOperationController struct {
status string
}
func (f *fakedOperationController) StartReplication(*model.Policy, *model.Resource) (int64, error) {
func (f *fakedOperationController) StartReplication(*model.Policy, *model.Resource, string) (int64, error) {
return 0, nil
}
func (f *fakedOperationController) StopReplication(int64) error {

View File

@ -19,6 +19,8 @@ import (
"net/http"
"time"
"github.com/goharbor/harbor/src/replication/ng/model"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/job"
job_models "github.com/goharbor/harbor/src/common/job/models"
@ -60,7 +62,7 @@ func (s *scheduler) Schedule(policyID int64, cron string) error {
}
log.Debugf("the schedule job record %d added", id)
replicateURL := fmt.Sprintf("%s/api/replication/executions", config.Config.CoreURL)
replicateURL := fmt.Sprintf("%s/api/replication/executions?trigger=%s", config.Config.CoreURL, model.TriggerTypeScheduled)
statusHookURL := fmt.Sprintf("%s/service/notifications/jobs/replication/schedule/%d", config.Config.CoreURL, id)
jobID, err := s.jobservice.SubmitJob(&job_models.JobData{
Name: job.Scheduler,