mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-03 14:37:44 +01:00
Fix replication related issues
1. Add operation property for tasks 2. Add trigger property for executions 3. Update the getting registry info API to allow passing 0 as ID to get the info of local Harbor registry Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
parent
18dbd807a4
commit
4484bca756
@ -85,6 +85,7 @@ create table replication_task (
|
||||
resource_type varchar(64),
|
||||
src_resource varchar(256),
|
||||
dst_resource varchar(256),
|
||||
operation varchar(32),
|
||||
job_id varchar(64),
|
||||
status varchar(32),
|
||||
start_time timestamp default CURRENT_TIMESTAMP,
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/event"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/api/models"
|
||||
@ -215,16 +217,27 @@ func (t *RegistryAPI) Delete() {
|
||||
|
||||
// GetInfo returns the base info and capability declarations of the registry
|
||||
func (t *RegistryAPI) GetInfo() {
|
||||
id := t.GetIDFromURL()
|
||||
registry, err := t.manager.Get(id)
|
||||
if err != nil {
|
||||
t.HandleInternalServerError(fmt.Sprintf("failed to get registry %d: %v", id, err))
|
||||
id, err := t.GetInt64FromPath(":id")
|
||||
// "0" is used for the ID of the local Harbor registry
|
||||
if err != nil || id < 0 {
|
||||
t.HandleBadRequest(fmt.Sprintf("invalid registry ID %s", t.GetString(":id")))
|
||||
return
|
||||
}
|
||||
if registry == nil {
|
||||
t.HandleNotFound(fmt.Sprintf("registry %d not found", id))
|
||||
return
|
||||
var registry *model.Registry
|
||||
if id == 0 {
|
||||
registry = event.GetLocalRegistry()
|
||||
} else {
|
||||
registry, err = t.manager.Get(id)
|
||||
if err != nil {
|
||||
t.HandleInternalServerError(fmt.Sprintf("failed to get registry %d: %v", id, err))
|
||||
return
|
||||
}
|
||||
if registry == nil {
|
||||
t.HandleNotFound(fmt.Sprintf("registry %d not found", id))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
factory, err := adapter.GetFactory(registry.Type)
|
||||
if err != nil {
|
||||
t.HandleInternalServerError(fmt.Sprintf("failed to get the adapter factory for registry type %s: %v", registry.Type, err))
|
||||
|
@ -19,6 +19,8 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/event"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng"
|
||||
@ -120,7 +122,8 @@ func (r *ReplicationOperationAPI) CreateExecution() {
|
||||
return
|
||||
}
|
||||
|
||||
executionID, err := ng.OperationCtl.StartReplication(policy, nil)
|
||||
trigger := r.GetString("trigger", model.TriggerTypeManual)
|
||||
executionID, err := ng.OperationCtl.StartReplication(policy, nil, trigger)
|
||||
if err != nil {
|
||||
r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err))
|
||||
return
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
|
||||
type fakedOperationController struct{}
|
||||
|
||||
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
|
||||
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
func (f *fakedOperationController) StopReplication(int64) error {
|
||||
|
@ -135,7 +135,8 @@ func initRouters() {
|
||||
|
||||
beego.Router("/api/registries", &api.RegistryAPI{}, "get:List;post:Post")
|
||||
beego.Router("/api/registries/:id([0-9]+)", &api.RegistryAPI{}, "get:Get;put:Put;delete:Delete")
|
||||
beego.Router("/api/registries/:id([0-9]+)/info", &api.RegistryAPI{}, "get:GetInfo")
|
||||
// we use "0" as the ID of the local Harbor registry, so don't add "([0-9]+)" in the path
|
||||
beego.Router("/api/registries/:id/info", &api.RegistryAPI{}, "get:GetInfo")
|
||||
|
||||
beego.Router("/v2/*", &controllers.RegistryProxy{}, "*:Handle")
|
||||
|
||||
|
@ -106,13 +106,13 @@ type TaskFieldsName struct {
|
||||
}
|
||||
|
||||
// Task represent the tasks in one execution.
|
||||
// TODO add operation property
|
||||
type Task struct {
|
||||
ID int64 `orm:"pk;auto;column(id)" json:"id"`
|
||||
ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"`
|
||||
ResourceType string `orm:"column(resource_type)" json:"resource_type"`
|
||||
SrcResource string `orm:"column(src_resource)" json:"src_resource"`
|
||||
DstResource string `orm:"column(dst_resource)" json:"dst_resource"`
|
||||
Operation string `orm:"column(operation)" json:"operation"`
|
||||
JobID string `orm:"column(job_id)" json:"job_id"`
|
||||
Status string `orm:"column(status)" json:"status"`
|
||||
StartTime time.Time `orm:"column(start_time)" json:"start_time"`
|
||||
|
@ -76,7 +76,7 @@ func (h *handler) Handle(event *Event) error {
|
||||
if err := PopulateRegistries(h.registryMgr, policy); err != nil {
|
||||
return err
|
||||
}
|
||||
id, err := h.opCtl.StartReplication(policy, event.Resource)
|
||||
id, err := h.opCtl.StartReplication(policy, event.Resource, model.TriggerTypeEventBased)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -140,7 +140,7 @@ func PopulateRegistries(registryMgr registry.Manager, policy *model.Policy) erro
|
||||
|
||||
func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model.Registry, error) {
|
||||
if registry == nil || registry.ID == 0 {
|
||||
return getLocalRegistry(), nil
|
||||
return GetLocalRegistry(), nil
|
||||
}
|
||||
reg, err := registryMgr.Get(registry.ID)
|
||||
if err != nil {
|
||||
@ -152,7 +152,8 @@ func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model
|
||||
return reg, nil
|
||||
}
|
||||
|
||||
func getLocalRegistry() *model.Registry {
|
||||
// GetLocalRegistry returns the info of the local Harbor registry
|
||||
func GetLocalRegistry() *model.Registry {
|
||||
return &model.Registry{
|
||||
Type: model.RegistryTypeHarbor,
|
||||
Name: "Local",
|
||||
|
@ -28,7 +28,7 @@ import (
|
||||
|
||||
type fakedOperationController struct{}
|
||||
|
||||
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
|
||||
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
func (f *fakedOperationController) StopReplication(int64) error {
|
||||
|
@ -31,7 +31,8 @@ import (
|
||||
// Controller handles the replication-related operations: start,
|
||||
// stop, query, etc.
|
||||
type Controller interface {
|
||||
StartReplication(policy *model.Policy, resource *model.Resource) (int64, error)
|
||||
// trigger is used to specified that what this replication is triggered by
|
||||
StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error)
|
||||
StopReplication(int64) error
|
||||
ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error)
|
||||
GetExecution(int64) (*models.Execution, error)
|
||||
@ -56,12 +57,14 @@ type controller struct {
|
||||
scheduler scheduler.Scheduler
|
||||
}
|
||||
|
||||
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
|
||||
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) {
|
||||
if resource != nil && len(resource.Metadata.Vtags) != 1 {
|
||||
return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags)
|
||||
}
|
||||
|
||||
id, err := createExecution(c.executionMgr, policy.ID)
|
||||
if len(trigger) == 0 {
|
||||
trigger = model.TriggerTypeManual
|
||||
}
|
||||
id, err := createExecution(c.executionMgr, policy.ID, trigger)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -139,9 +142,10 @@ func (c *controller) GetTaskLog(taskID int64) ([]byte, error) {
|
||||
}
|
||||
|
||||
// create the execution record in database
|
||||
func createExecution(mgr execution.Manager, policyID int64) (int64, error) {
|
||||
func createExecution(mgr execution.Manager, policyID int64, trigger string) (int64, error) {
|
||||
id, err := mgr.Create(&models.Execution{
|
||||
PolicyID: policyID,
|
||||
Trigger: trigger,
|
||||
Status: models.ExecutionStatusInProgress,
|
||||
StartTime: time.Now(),
|
||||
})
|
||||
|
@ -232,24 +232,24 @@ func TestStartReplication(t *testing.T) {
|
||||
Vtags: []string{"1.0", "2.0"},
|
||||
},
|
||||
}
|
||||
_, err = ctl.StartReplication(policy, resource)
|
||||
_, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
|
||||
require.NotNil(t, err)
|
||||
|
||||
// replicate resource deletion
|
||||
resource.Metadata.Vtags = []string{"1.0"}
|
||||
resource.Deleted = true
|
||||
id, err := ctl.StartReplication(policy, resource)
|
||||
id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), id)
|
||||
|
||||
// replicate resource copy
|
||||
resource.Deleted = false
|
||||
id, err = ctl.StartReplication(policy, resource)
|
||||
id, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), id)
|
||||
|
||||
// nil resource
|
||||
id, err = ctl.StartReplication(policy, nil)
|
||||
id, err = ctl.StartReplication(policy, nil, model.TriggerTypeEventBased)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), id)
|
||||
}
|
||||
|
@ -263,12 +263,17 @@ func preprocess(scheduler scheduler.Scheduler, srcResources, dstResources []*mod
|
||||
// create task records in database
|
||||
func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.ScheduleItem) error {
|
||||
for _, item := range items {
|
||||
operation := "copy"
|
||||
if item.DstResource.Deleted {
|
||||
operation = "deletion"
|
||||
}
|
||||
task := &models.Task{
|
||||
ExecutionID: executionID,
|
||||
Status: models.TaskStatusInitialized,
|
||||
ResourceType: string(item.SrcResource.Type),
|
||||
SrcResource: getResourceName(item.SrcResource),
|
||||
DstResource: getResourceName(item.DstResource),
|
||||
Operation: operation,
|
||||
}
|
||||
id, err := mgr.CreateTask(task)
|
||||
if err != nil {
|
||||
|
@ -28,7 +28,7 @@ type fakedOperationController struct {
|
||||
status string
|
||||
}
|
||||
|
||||
func (f *fakedOperationController) StartReplication(*model.Policy, *model.Resource) (int64, error) {
|
||||
func (f *fakedOperationController) StartReplication(*model.Policy, *model.Resource, string) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
func (f *fakedOperationController) StopReplication(int64) error {
|
||||
|
@ -19,6 +19,8 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
|
||||
common_http "github.com/goharbor/harbor/src/common/http"
|
||||
"github.com/goharbor/harbor/src/common/job"
|
||||
job_models "github.com/goharbor/harbor/src/common/job/models"
|
||||
@ -60,7 +62,7 @@ func (s *scheduler) Schedule(policyID int64, cron string) error {
|
||||
}
|
||||
log.Debugf("the schedule job record %d added", id)
|
||||
|
||||
replicateURL := fmt.Sprintf("%s/api/replication/executions", config.Config.CoreURL)
|
||||
replicateURL := fmt.Sprintf("%s/api/replication/executions?trigger=%s", config.Config.CoreURL, model.TriggerTypeScheduled)
|
||||
statusHookURL := fmt.Sprintf("%s/service/notifications/jobs/replication/schedule/%d", config.Config.CoreURL, id)
|
||||
jobID, err := s.jobservice.SubmitJob(&job_models.JobData{
|
||||
Name: job.Scheduler,
|
||||
|
Loading…
Reference in New Issue
Block a user