Fix bug of replication

1. check the disable/enable status before starting the replication
2. process the support_namespace property

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-04-11 14:19:39 +08:00
parent 3c093c8f3f
commit bc0123662b
13 changed files with 132 additions and 81 deletions

View File

@ -376,6 +376,7 @@ func process(info *model.RegistryInfo) *model.RegistryInfo {
in := &model.RegistryInfo{
Type: info.Type,
Description: info.Description,
SupportNamespace: info.SupportNamespace,
SupportedTriggers: info.SupportedTriggers,
}
filters := []*model.FilterStyle{}

View File

@ -19,12 +19,10 @@ import (
"net/http"
"strconv"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// ReplicationOperationAPI handles the replication operation requests
@ -111,12 +109,14 @@ func (r *ReplicationOperationAPI) CreateExecution() {
r.HandleInternalServerError(fmt.Sprintf("failed to get policy %d: %v", execution.PolicyID, err))
return
}
if policy == nil {
r.HandleNotFound(fmt.Sprintf("policy %d not found", execution.PolicyID))
return
}
if !policy.Enabled {
r.HandleBadRequest(fmt.Sprintf("the policy %d is disabled", execution.PolicyID))
return
}
if err = event.PopulateRegistries(ng.RegistryMgr, policy); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to populate registries for policy %d: %v", execution.PolicyID, err))
return

View File

@ -83,7 +83,18 @@ func (f *fakedPolicyManager) List(...*model.PolicyQuery) (int64, []*model.Policy
func (f *fakedPolicyManager) Get(id int64) (*model.Policy, error) {
if id == 1 {
return &model.Policy{
ID: 1,
ID: 1,
Enabled: true,
SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
}, nil
}
if id == 2 {
return &model.Policy{
ID: 2,
Enabled: false,
SrcRegistry: &model.Registry{
ID: 1,
},
@ -100,7 +111,7 @@ func (f *fakedPolicyManager) GetByName(name string) (*model.Policy, error) {
}
return nil, nil
}
func (f *fakedPolicyManager) Update(*model.Policy, ...string) error {
func (f *fakedPolicyManager) Update(*model.Policy) error {
return nil
}
func (f *fakedPolicyManager) Remove(int64) error {
@ -183,12 +194,24 @@ func TestCreateExecution(t *testing.T) {
method: http.MethodPost,
url: "/api/replication/executions",
bodyJSON: &models.Execution{
PolicyID: 2,
PolicyID: 3,
},
credential: sysAdmin,
},
code: http.StatusNotFound,
},
// 400
{
request: &testingRequest{
method: http.MethodPost,
url: "/api/replication/executions",
bodyJSON: &models.Execution{
PolicyID: 2,
},
credential: sysAdmin,
},
code: http.StatusBadRequest,
},
// 201
{
request: &testingRequest{

View File

@ -246,7 +246,7 @@ func TestReplicationPolicyAPIGet(t *testing.T) {
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/policies/2",
url: "/api/replication/policies/3",
credential: sysAdmin,
},
code: http.StatusNotFound,
@ -296,7 +296,7 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
{
request: &testingRequest{
method: http.MethodPut,
url: "/api/replication/policies/2",
url: "/api/replication/policies/3",
credential: sysAdmin,
bodyJSON: &model.Policy{},
},
@ -398,7 +398,7 @@ func TestReplicationPolicyAPIDelete(t *testing.T) {
{
request: &testingRequest{
method: http.MethodDelete,
url: "/api/replication/policies/2",
url: "/api/replication/policies/3",
credential: sysAdmin,
},
code: http.StatusNotFound,

View File

@ -101,10 +101,11 @@ func (a *adapter) Info() (*model.RegistryInfo, error) {
Type: model.FilterTypeTag,
Style: model.FilterStyleTypeText,
},
{
Type: model.FilterTypeLabel,
Style: model.FilterStyleTypeText,
},
// TODO add support for label filter
// {
// Type: model.FilterTypeLabel,
// Style: model.FilterStyleTypeText,
// },
},
SupportedTriggers: []model.TriggerType{
model.TriggerTypeManual,

View File

@ -41,7 +41,7 @@ func TestInfo(t *testing.T) {
info, err := adapter.Info()
require.Nil(t, err)
assert.Equal(t, model.RegistryTypeHarbor, info.Type)
assert.Equal(t, 3, len(info.SupportedResourceFilters))
assert.Equal(t, 2, len(info.SupportedResourceFilters))
assert.Equal(t, 3, len(info.SupportedTriggers))
assert.Equal(t, 2, len(info.SupportedResourceTypes))
assert.Equal(t, model.ResourceTypeRepository, info.SupportedResourceTypes[0])
@ -64,7 +64,7 @@ func TestInfo(t *testing.T) {
info, err = adapter.Info()
require.Nil(t, err)
assert.Equal(t, model.RegistryTypeHarbor, info.Type)
assert.Equal(t, 3, len(info.SupportedResourceFilters))
assert.Equal(t, 2, len(info.SupportedResourceFilters))
assert.Equal(t, 3, len(info.SupportedTriggers))
assert.Equal(t, 1, len(info.SupportedResourceTypes))
assert.Equal(t, model.ResourceTypeRepository, info.SupportedResourceTypes[0])

View File

@ -4,16 +4,17 @@ import "time"
// RepPolicy is the model for a ng replication policy.
type RepPolicy struct {
ID int64 `orm:"pk;auto;column(id)" json:"id"`
Name string `orm:"column(name)" json:"name"`
Description string `orm:"column(description)" json:"description"`
Creator string `orm:"column(creator)" json:"creator"`
SrcRegistryID int64 `orm:"column(src_registry_id)" json:"src_registry_id"`
SrcNamespaces string `orm:"column(src_namespaces)" json:"src_namespaces"`
DestRegistryID int64 `orm:"column(dest_registry_id)" json:"dest_registry_id"`
DestNamespace string `orm:"column(dest_namespace)" json:"dest_namespace"`
Override bool `orm:"column(override)" json:"override"`
Enabled bool `orm:"column(enabled)" json:"enabled"`
ID int64 `orm:"pk;auto;column(id)" json:"id"`
Name string `orm:"column(name)" json:"name"`
Description string `orm:"column(description)" json:"description"`
Creator string `orm:"column(creator)" json:"creator"`
SrcRegistryID int64 `orm:"column(src_registry_id)" json:"src_registry_id"`
SrcNamespaces string `orm:"column(src_namespaces)" json:"src_namespaces"`
DestRegistryID int64 `orm:"column(dest_registry_id)" json:"dest_registry_id"`
DestNamespace string `orm:"column(dest_namespace)" json:"dest_namespace"`
Override bool `orm:"column(override)" json:"override"`
Enabled bool `orm:"column(enabled)" json:"enabled"`
// TODO rename the db column to trigger
Trigger string `orm:"column(cron_str)" json:"trigger"`
Filters string `orm:"column(filters)" json:"filters"`
ReplicateDeletion bool `orm:"column(replicate_deletion)" json:"replicate_deletion"`

View File

@ -99,7 +99,7 @@ func (f *fakedPolicyController) Get(id int64) (*model.Policy, error) {
func (f *fakedPolicyController) GetByName(name string) (*model.Policy, error) {
return nil, nil
}
func (f *fakedPolicyController) Update(*model.Policy, ...string) error {
func (f *fakedPolicyController) Update(*model.Policy) error {
return nil
}
func (f *fakedPolicyController) Remove(int64) error {

View File

@ -20,7 +20,6 @@ import (
"time"
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
@ -59,6 +58,8 @@ type controller struct {
}
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error) {
// only support one tag if the resource is specified as we append the tag name as a filter
// when creating the flow in function "createFlow"
if resource != nil && len(resource.Metadata.Vtags) != 1 {
return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags)
}
@ -72,7 +73,8 @@ func (c *controller) StartReplication(policy *model.Policy, resource *model.Reso
flow := c.createFlow(id, policy, resource)
if n, err := c.flowCtl.Start(flow); err != nil {
// just update the status text, the status will be updated automatically
// only update the execution when got error.
// if got no error, it will be updated automatically
// when listing the execution records
if e := c.executionMgr.Update(&models.Execution{
ID: id,
@ -95,8 +97,8 @@ func (c *controller) createFlow(executionID int64, policy *model.Policy, resourc
if resource != nil && resource.Deleted {
return flow.NewDeletionFlow(c.executionMgr, c.scheduler, executionID, policy, []*model.Resource{resource})
}
// copy only one resource, add extra filters to the policy to make sure
// only the resource will be filtered out
// copy only one resource, add extra filters to the policy to make sure
// only the specified resource will be filtered out
if resource != nil {
filters := []*model.Filter{
{

View File

@ -28,9 +28,8 @@ type Controller interface {
Get(int64) (*model.Policy, error)
// Get policy by the name
GetByName(string) (*model.Policy, error)
// Update the specified policy, the "props" are the properties of policy
// that need to be updated
Update(policy *model.Policy, props ...string) error
// Update the specified policy
Update(policy *model.Policy) error
// Remove the specified policy
Remove(int64) error
}

View File

@ -57,7 +57,7 @@ func (c *controller) Create(policy *model.Policy) (int64, error) {
return id, nil
}
func (c *controller) Update(policy *model.Policy, props ...string) error {
func (c *controller) Update(policy *model.Policy) error {
origin, err := c.Controller.Get(policy.ID)
if err != nil {
return err
@ -66,8 +66,8 @@ func (c *controller) Update(policy *model.Policy, props ...string) error {
return fmt.Errorf("policy %d not found", policy.ID)
}
// if no need to reschedule the policy, just update it
if !isScheduleTriggerChanged(origin, policy, props...) {
return c.Controller.Update(policy, props...)
if !isScheduleTriggerChanged(origin, policy) {
return c.Controller.Update(policy)
}
// need to reschedule the policy
// unschedule first if needed
@ -77,7 +77,7 @@ func (c *controller) Update(policy *model.Policy, props ...string) error {
}
}
// update the policy
if err = c.Controller.Update(policy, props...); err != nil {
if err = c.Controller.Update(policy); err != nil {
return err
}
// schedule again if needed
@ -109,27 +109,16 @@ func isScheduledTrigger(policy *model.Policy) bool {
if policy == nil {
return false
}
if !policy.Enabled {
return false
}
if policy.Trigger == nil {
return false
}
return policy.Trigger.Type == model.TriggerTypeScheduled
}
func isScheduleTriggerChanged(origin, current *model.Policy, props ...string) bool {
// doesn't update the trigger property
if len(props) > 0 {
found := false
for _, prop := range props {
if prop == "Trigger" || prop == "cron_str" {
found = true
break
}
}
if !found {
return false
}
}
func isScheduleTriggerChanged(origin, current *model.Policy) bool {
o := isScheduledTrigger(origin)
c := isScheduledTrigger(current)
// both triggers are not scheduled

View File

@ -40,7 +40,7 @@ func (f *fakedPolicyController) Get(id int64) (*model.Policy, error) {
func (f *fakedPolicyController) GetByName(name string) (*model.Policy, error) {
return nil, nil
}
func (f *fakedPolicyController) Update(*model.Policy, ...string) error {
func (f *fakedPolicyController) Update(*model.Policy) error {
return nil
}
func (f *fakedPolicyController) Remove(int64) error {
@ -71,14 +71,24 @@ func TestIsScheduledTrigger(t *testing.T) {
policy: nil,
expected: false,
},
// policy is disabled
{
policy: &model.Policy{
Enabled: false,
},
expected: false,
},
// trigger is nil
{
policy: &model.Policy{},
policy: &model.Policy{
Enabled: true,
},
expected: false,
},
// trigger type isn't scheduled
{
policy: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
@ -88,6 +98,7 @@ func TestIsScheduledTrigger(t *testing.T) {
// trigger type is scheduled
{
policy: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
},
@ -104,34 +115,28 @@ func TestIsScheduleTriggerChanged(t *testing.T) {
cases := []struct {
origin *model.Policy
current *model.Policy
props []string
expected bool
}{
// props contains no trigger field
{
props: []string{"name"},
expected: false,
},
// both triggers are not scheduled
{
origin: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
},
current: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
},
props: []string{"Trigger"},
expected: false,
},
// both triggers are scheduled and the crons are not same
{
origin: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
@ -140,6 +145,7 @@ func TestIsScheduleTriggerChanged(t *testing.T) {
},
},
current: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
@ -147,13 +153,12 @@ func TestIsScheduleTriggerChanged(t *testing.T) {
},
},
},
props: []string{"Trigger"},
expected: true,
},
// both triggers are scheduled and the crons are same
{
origin: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
@ -162,6 +167,7 @@ func TestIsScheduleTriggerChanged(t *testing.T) {
},
},
current: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
@ -169,13 +175,12 @@ func TestIsScheduleTriggerChanged(t *testing.T) {
},
},
},
props: []string{"Trigger"},
expected: false,
},
// one trigger is scheduled but the other one isn't
{
origin: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
@ -184,16 +189,39 @@ func TestIsScheduleTriggerChanged(t *testing.T) {
},
},
current: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
},
props: []string{"Trigger"},
expected: true,
},
// one trigger is scheduled but disabled and
// the other one is scheduled but enabled
{
origin: &model.Policy{
Enabled: false,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 05 * * *",
},
},
},
current: &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 05 * * *",
},
},
},
expected: true,
},
}
for _, c := range cases {
assert.Equal(t, c.expected, isScheduleTriggerChanged(c.origin, c.current, c.props...))
assert.Equal(t, c.expected, isScheduleTriggerChanged(c.origin, c.current))
}
}
@ -211,6 +239,7 @@ func TestCreate(t *testing.T) {
// scheduled trigger
_, err = ctl.Create(&model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
@ -233,25 +262,28 @@ func TestUpdate(t *testing.T) {
var origin, current *model.Policy
// origin policy is nil
current = &model.Policy{
ID: 1,
ID: 1,
Enabled: true,
}
err := ctl.Update(current)
assert.NotNil(t, err)
// the trigger doesn't change
origin = &model.Policy{
ID: 1,
ID: 1,
Enabled: true,
}
c.policy = origin
current = origin
err = ctl.Update(current, "Trigger")
err = ctl.Update(current)
require.Nil(t, err)
assert.False(t, scheduler.scheduled)
assert.False(t, scheduler.unscheduled)
// the trigger changed
origin = &model.Policy{
ID: 1,
ID: 1,
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
@ -261,6 +293,7 @@ func TestUpdate(t *testing.T) {
}
c.policy = origin
current = &model.Policy{
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
@ -268,7 +301,7 @@ func TestUpdate(t *testing.T) {
},
},
}
err = ctl.Update(current, "Trigger")
err = ctl.Update(current)
require.Nil(t, err)
assert.True(t, scheduler.unscheduled)
assert.True(t, scheduler.scheduled)
@ -288,7 +321,8 @@ func TestRemove(t *testing.T) {
// the trigger type isn't scheduled
policy := &model.Policy{
ID: 1,
ID: 1,
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
@ -300,7 +334,8 @@ func TestRemove(t *testing.T) {
// the trigger type is scheduled
policy = &model.Policy{
ID: 1,
ID: 1,
Enabled: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{

View File

@ -190,13 +190,13 @@ func (m *DefaultManager) GetByName(name string) (*model.Policy, error) {
}
// Update Update the specified policy
func (m *DefaultManager) Update(policy *model.Policy, props ...string) error {
func (m *DefaultManager) Update(policy *model.Policy) error {
updatePolicy, err := convertToPersistModel(policy)
if err != nil {
return err
}
return dao.UpdateRepPolicy(updatePolicy, props...)
return dao.UpdateRepPolicy(updatePolicy)
}
// Remove Remove the specified policy