mirror of
https://github.com/goharbor/harbor.git
synced 2025-02-21 06:11:45 +01:00
Merge pull request #3644 from ywk253100/171120_immediately_trigger
Setup/Unset trigger when CURD policies
This commit is contained in:
commit
b85da9679c
@ -91,22 +91,61 @@ func (ctl *Controller) Init() error {
|
|||||||
|
|
||||||
//CreatePolicy is used to create a new policy and enable it if necessary
|
//CreatePolicy is used to create a new policy and enable it if necessary
|
||||||
func (ctl *Controller) CreatePolicy(newPolicy models.ReplicationPolicy) (int64, error) {
|
func (ctl *Controller) CreatePolicy(newPolicy models.ReplicationPolicy) (int64, error) {
|
||||||
//Validate policy
|
id, err := ctl.policyManager.CreatePolicy(newPolicy)
|
||||||
// TODO
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
return ctl.policyManager.CreatePolicy(newPolicy)
|
if err = ctl.triggerManager.SetupTrigger(id, *newPolicy.Trigger); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//UpdatePolicy will update the policy with new content.
|
//UpdatePolicy will update the policy with new content.
|
||||||
//Parameter updatedPolicy must have the ID of the updated policy.
|
//Parameter updatedPolicy must have the ID of the updated policy.
|
||||||
func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) error {
|
func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) error {
|
||||||
// TODO check pre-conditions
|
// TODO check pre-conditions
|
||||||
return ctl.policyManager.UpdatePolicy(updatedPolicy)
|
|
||||||
|
id := updatedPolicy.ID
|
||||||
|
originPolicy, err := ctl.policyManager.GetPolicy(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if originPolicy.ID == 0 {
|
||||||
|
return fmt.Errorf("policy %d not found", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = ctl.triggerManager.UnsetTrigger(id, *originPolicy.Trigger); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = ctl.policyManager.UpdatePolicy(updatedPolicy); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctl.triggerManager.SetupTrigger(id, *updatedPolicy.Trigger)
|
||||||
}
|
}
|
||||||
|
|
||||||
//RemovePolicy will remove the specified policy and clean the related settings
|
//RemovePolicy will remove the specified policy and clean the related settings
|
||||||
func (ctl *Controller) RemovePolicy(policyID int64) error {
|
func (ctl *Controller) RemovePolicy(policyID int64) error {
|
||||||
// TODO check pre-conditions
|
// TODO check pre-conditions
|
||||||
|
|
||||||
|
policy, err := ctl.policyManager.GetPolicy(policyID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if policy.ID == 0 {
|
||||||
|
return fmt.Errorf("policy %d not found", policyID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = ctl.triggerManager.UnsetTrigger(policyID, *policy.Trigger); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return ctl.policyManager.RemovePolicy(policyID)
|
return ctl.policyManager.RemovePolicy(policyID)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,6 +161,9 @@ func (ctl *Controller) GetPolicies(query models.QueryParameter) ([]models.Replic
|
|||||||
|
|
||||||
//Replicate starts one replication defined in the specified policy;
|
//Replicate starts one replication defined in the specified policy;
|
||||||
//Can be launched by the API layer and related triggers.
|
//Can be launched by the API layer and related triggers.
|
||||||
func (ctl *Controller) Replicate(policyID int64) error {
|
func (ctl *Controller) Replicate(policyID int64, item ...*models.FilterItem) error {
|
||||||
|
|
||||||
|
fmt.Printf("replicating %d ...\n", policyID)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -43,5 +43,5 @@ func (st *ImmediateTrigger) Setup() error {
|
|||||||
|
|
||||||
//Unset is the implementation of same method defined in Trigger interface
|
//Unset is the implementation of same method defined in Trigger interface
|
||||||
func (st *ImmediateTrigger) Unset() error {
|
func (st *ImmediateTrigger) Unset() error {
|
||||||
return errors.New("Not implemented")
|
return DefaultWatchList.Remove(st.params.PolicyID)
|
||||||
}
|
}
|
||||||
|
@ -12,17 +12,18 @@ import (
|
|||||||
//with json format.
|
//with json format.
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
//Cache for triggers
|
//Cache for triggers
|
||||||
cache *Cache
|
//cache *Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
//NewManager is the constructor of trigger manager.
|
//NewManager is the constructor of trigger manager.
|
||||||
//capacity is the max number of trigger references manager can keep in memory
|
//capacity is the max number of trigger references manager can keep in memory
|
||||||
func NewManager(capacity int) *Manager {
|
func NewManager(capacity int) *Manager {
|
||||||
return &Manager{
|
return &Manager{
|
||||||
cache: NewCache(capacity),
|
//cache: NewCache(capacity),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
//GetTrigger returns the enabled trigger reference if existing in the cache.
|
//GetTrigger returns the enabled trigger reference if existing in the cache.
|
||||||
func (m *Manager) GetTrigger(policyID int64) Interface {
|
func (m *Manager) GetTrigger(policyID int64) Interface {
|
||||||
return m.cache.Get(policyID)
|
return m.cache.Get(policyID)
|
||||||
@ -47,6 +48,7 @@ func (m *Manager) RemoveTrigger(policyID int64) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
//SetupTrigger will create the new trigger based on the provided json parameters.
|
//SetupTrigger will create the new trigger based on the provided json parameters.
|
||||||
//If failed, an error will be returned.
|
//If failed, an error will be returned.
|
||||||
|
@ -28,16 +28,11 @@ func (wl *WatchList) Add(item WatchItem) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Remove the specified watch item from list
|
//Remove the specified watch item from list
|
||||||
func (wl *WatchList) Remove() WatchItem {
|
func (wl *WatchList) Remove(policyID int64) error {
|
||||||
return WatchItem{}
|
|
||||||
}
|
|
||||||
|
|
||||||
//Update the watch item in the list
|
|
||||||
func (wl *WatchList) Update(updatedItem WatchItem) error {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//Get the specified watch item
|
//Get the watch items according to the namespace and operation
|
||||||
func (wl *WatchList) Get(namespace string) WatchItem {
|
func (wl *WatchList) Get(namespace, operation string) ([]WatchItem, error) {
|
||||||
return WatchItem{}
|
return []WatchItem{}, nil
|
||||||
}
|
}
|
||||||
|
@ -255,7 +255,7 @@ func (ra *RepositoryAPI) Delete() {
|
|||||||
}
|
}
|
||||||
log.Infof("delete tag: %s:%s", repoName, t)
|
log.Infof("delete tag: %s:%s", repoName, t)
|
||||||
|
|
||||||
go TriggerReplicationByRepository(project.ProjectID, repoName, []string{t}, models.RepOpDelete)
|
go CheckAndTriggerReplication(repoName+":"+t, "delete")
|
||||||
|
|
||||||
go func(tag string) {
|
go func(tag string) {
|
||||||
if err := dao.AddAccessLog(models.AccessLog{
|
if err := dao.AddAccessLog(models.AccessLog{
|
||||||
|
@ -32,6 +32,10 @@ import (
|
|||||||
"github.com/vmware/harbor/src/common/utils/log"
|
"github.com/vmware/harbor/src/common/utils/log"
|
||||||
"github.com/vmware/harbor/src/common/utils/registry"
|
"github.com/vmware/harbor/src/common/utils/registry"
|
||||||
"github.com/vmware/harbor/src/common/utils/registry/auth"
|
"github.com/vmware/harbor/src/common/utils/registry/auth"
|
||||||
|
"github.com/vmware/harbor/src/replication"
|
||||||
|
"github.com/vmware/harbor/src/replication/core"
|
||||||
|
rep_models "github.com/vmware/harbor/src/replication/models"
|
||||||
|
"github.com/vmware/harbor/src/replication/trigger"
|
||||||
"github.com/vmware/harbor/src/ui/config"
|
"github.com/vmware/harbor/src/ui/config"
|
||||||
"github.com/vmware/harbor/src/ui/promgr"
|
"github.com/vmware/harbor/src/ui/promgr"
|
||||||
"github.com/vmware/harbor/src/ui/service/token"
|
"github.com/vmware/harbor/src/ui/service/token"
|
||||||
@ -77,7 +81,41 @@ func checkUserExists(name string) int {
|
|||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckAndTriggerReplication checks whether replication policy is set
|
||||||
|
// on the resource, if is, trigger the replication
|
||||||
|
func CheckAndTriggerReplication(image, operation string) {
|
||||||
|
project, _ := utils.ParseRepository(image)
|
||||||
|
watchItems, err := trigger.DefaultWatchList.Get(project, operation)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get watch list for resource %s, operation %s: %v", image, operation, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(watchItems) == 0 {
|
||||||
|
log.Debugf("no replication should be triggered for resource %s, operation %s, skip", image, operation)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, watchItem := range watchItems {
|
||||||
|
// TODO define a new type ReplicationItem to wrap FilterItem and operation.
|
||||||
|
// Maybe change the FilterItem to interface and define a type Resource to
|
||||||
|
// implement FilterItem is better?
|
||||||
|
item := &rep_models.FilterItem{
|
||||||
|
Kind: replication.FilterItemKindTag,
|
||||||
|
Value: image,
|
||||||
|
Metadata: map[string]interface{}{
|
||||||
|
"operation": operation,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := core.DefaultController.Replicate(watchItem.PolicyID, item); err != nil {
|
||||||
|
log.Errorf("failed to trigger replication for resource: %s, operation: %s: %v", image, operation, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Infof("replication for resource: %s, operation: %s triggered", image, operation)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TriggerReplication triggers the replication according to the policy
|
// TriggerReplication triggers the replication according to the policy
|
||||||
|
// TODO remove
|
||||||
func TriggerReplication(policyID int64, repository string,
|
func TriggerReplication(policyID int64, repository string,
|
||||||
tags []string, operation string) error {
|
tags []string, operation string) error {
|
||||||
data := struct {
|
data := struct {
|
||||||
@ -101,26 +139,7 @@ func TriggerReplication(policyID int64, repository string,
|
|||||||
return uiutils.RequestAsUI("POST", url, bytes.NewBuffer(b), uiutils.NewStatusRespHandler(http.StatusOK))
|
return uiutils.RequestAsUI("POST", url, bytes.NewBuffer(b), uiutils.NewStatusRespHandler(http.StatusOK))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TriggerReplicationByRepository triggers the replication according to the repository
|
// TODO remove
|
||||||
func TriggerReplicationByRepository(projectID int64, repository string, tags []string, operation string) {
|
|
||||||
policies, err := dao.GetRepPolicyByProject(projectID)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to get policies for repository %s: %v", repository, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, policy := range policies {
|
|
||||||
if policy.Enabled == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := TriggerReplication(policy.ID, repository, tags, operation); err != nil {
|
|
||||||
log.Errorf("failed to trigger replication of policy %d for %s: %v", policy.ID, repository, err)
|
|
||||||
} else {
|
|
||||||
log.Infof("replication of policy %d for %s triggered", policy.ID, repository)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func postReplicationAction(policyID int64, acton string) error {
|
func postReplicationAction(policyID int64, acton string) error {
|
||||||
data := struct {
|
data := struct {
|
||||||
PolicyID int64 `json:"policy_id"`
|
PolicyID int64 `json:"policy_id"`
|
||||||
|
@ -104,7 +104,7 @@ func (n *NotificationHandler) Post() {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go api.TriggerReplicationByRepository(pro.ProjectID, repository, []string{tag}, models.RepOpTransfer)
|
go api.CheckAndTriggerReplication(repository+":"+tag, "push")
|
||||||
|
|
||||||
if autoScanEnabled(pro) {
|
if autoScanEnabled(pro) {
|
||||||
last, err := clairdao.GetLastUpdate()
|
last, err := clairdao.GetLastUpdate()
|
||||||
|
Loading…
Reference in New Issue
Block a user