Replication webhook support (#11179)

* replication webhook support

Signed-off-by: guanxiatao <guanxiatao@corp.netease.com>

* replication webhook support with ut fixed

Signed-off-by: guanxiatao <guanxiatao@corp.netease.com>
This commit is contained in:
Ted Guan 2020-03-23 18:45:58 +08:00 committed by GitHub
parent 168637a743
commit e49a247d3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 588 additions and 4 deletions

View File

@ -25,6 +25,7 @@ func init() {
notifier.Subscribe(event.TopicScanningFailed, &scan.Handler{})
notifier.Subscribe(event.TopicScanningCompleted, &scan.Handler{})
notifier.Subscribe(event.TopicDeleteArtifact, &scan.DelArtHandler{})
notifier.Subscribe(event.TopicReplication, &artifact.ReplicationHandler{})
// replication
notifier.Subscribe(event.TopicPushArtifact, &replication.Handler{})

View File

@ -0,0 +1,206 @@
package artifact
import (
"errors"
"fmt"
"strings"
"github.com/goharbor/harbor/src/api/event"
"github.com/goharbor/harbor/src/api/event/handler/util"
commonModels "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/notifier/model"
notifyModel "github.com/goharbor/harbor/src/pkg/notifier/model"
"github.com/goharbor/harbor/src/replication"
rpModel "github.com/goharbor/harbor/src/replication/model"
)
// ReplicationHandler preprocess replication event data
type ReplicationHandler struct {
}
// Handle ...
func (r *ReplicationHandler) Handle(value interface{}) error {
if !config.NotificationEnable() {
log.Debug("notification feature is not enabled")
return nil
}
rpEvent, ok := value.(*event.ReplicationEvent)
if !ok {
return errors.New("invalid replication event type")
}
if rpEvent == nil {
return fmt.Errorf("nil replication event")
}
payload, project, err := constructReplicationPayload(rpEvent)
if err != nil {
return err
}
policies, err := notification.PolicyMgr.GetRelatedPolices(project.ProjectID, rpEvent.EventType)
if err != nil {
log.Errorf("failed to find policy for %s event: %v", rpEvent.EventType, err)
return err
}
if len(policies) == 0 {
log.Debugf("cannot find policy for %s event: %v", rpEvent.EventType, rpEvent)
return nil
}
err = util.SendHookWithPolicies(policies, payload, rpEvent.EventType)
if err != nil {
return err
}
return nil
}
// IsStateful ...
func (r *ReplicationHandler) IsStateful() bool {
return false
}
func constructReplicationPayload(event *event.ReplicationEvent) (*model.Payload, *commonModels.Project, error) {
task, err := replication.OperationCtl.GetTask(event.ReplicationTaskID)
if err != nil {
log.Errorf("failed to get replication task %d: error: %v", event.ReplicationTaskID, err)
return nil, nil, err
}
if task == nil {
return nil, nil, fmt.Errorf("task %d not found with replication event", event.ReplicationTaskID)
}
execution, err := replication.OperationCtl.GetExecution(task.ExecutionID)
if err != nil {
log.Errorf("failed to get replication execution %d: error: %v", task.ExecutionID, err)
return nil, nil, err
}
if execution == nil {
return nil, nil, fmt.Errorf("execution %d not found with replication event", task.ExecutionID)
}
rpPolicy, err := replication.PolicyCtl.Get(execution.PolicyID)
if err != nil {
log.Errorf("failed to get replication policy %d: error: %v", execution.PolicyID, err)
return nil, nil, err
}
if rpPolicy == nil {
return nil, nil, fmt.Errorf("policy %d not found with replication event", execution.PolicyID)
}
var remoteRegID int64
if rpPolicy.SrcRegistry != nil && rpPolicy.SrcRegistry.ID > 0 {
remoteRegID = rpPolicy.SrcRegistry.ID
}
if rpPolicy.DestRegistry != nil && rpPolicy.DestRegistry.ID > 0 {
remoteRegID = rpPolicy.DestRegistry.ID
}
remoteRegistry, err := replication.RegistryMgr.Get(remoteRegID)
if err != nil {
log.Errorf("failed to get replication remoteRegistry registry %d: error: %v", remoteRegID, err)
return nil, nil, err
}
if remoteRegistry == nil {
return nil, nil, fmt.Errorf("registry %d not found with replication event", remoteRegID)
}
srcNamespace, srcNameAndTag := getMetadataFromResource(task.SrcResource)
destNamespace, destNameAndTag := getMetadataFromResource(task.DstResource)
extURL, err := config.ExtURL()
if err != nil {
log.Errorf("Error while reading external endpoint URL: %v", err)
}
hostname := strings.Split(extURL, ":")[0]
remoteRes := &model.ReplicationResource{
RegistryName: remoteRegistry.Name,
RegistryType: string(remoteRegistry.Type),
Endpoint: remoteRegistry.URL,
Namespace: srcNamespace,
}
ext, err := config.ExtEndpoint()
if err != nil {
log.Errorf("Error while reading external endpoint: %v", err)
}
localRes := &model.ReplicationResource{
RegistryType: string(rpModel.RegistryTypeHarbor),
Endpoint: ext,
Namespace: destNamespace,
}
payload := &notifyModel.Payload{
Type: event.EventType,
OccurAt: event.OccurAt.Unix(),
Operator: string(execution.Trigger),
EventData: &model.EventData{
Replication: &model.Replication{
HarborHostname: hostname,
JobStatus: event.Status,
Description: rpPolicy.Description,
PolicyCreator: rpPolicy.Creator,
ArtifactType: task.ResourceType,
AuthenticationType: string(remoteRegistry.Credential.Type),
OverrideMode: rpPolicy.Override,
TriggerType: string(execution.Trigger),
ExecutionTimestamp: execution.StartTime.Unix(),
},
},
}
var prjName, nameAndTag string
// remote(src) -> local harbor(dest)
if rpPolicy.SrcRegistry != nil {
payload.EventData.Replication.SrcResource = remoteRes
payload.EventData.Replication.DestResource = localRes
prjName = destNamespace
nameAndTag = destNameAndTag
}
// local harbor(src) -> remote(dest)
if rpPolicy.DestRegistry != nil {
payload.EventData.Replication.DestResource = remoteRes
payload.EventData.Replication.SrcResource = localRes
prjName = srcNamespace
nameAndTag = srcNameAndTag
}
if event.Status == string(job.SuccessStatus) {
succeedArtifact := &model.ArtifactInfo{
Type: task.ResourceType,
Status: task.Status,
NameAndTag: nameAndTag,
}
payload.EventData.Replication.SuccessfulArtifact = []*model.ArtifactInfo{succeedArtifact}
}
if event.Status == string(job.ErrorStatus) {
failedArtifact := &model.ArtifactInfo{
Type: task.ResourceType,
Status: task.Status,
NameAndTag: nameAndTag,
}
payload.EventData.Replication.FailedArtifact = []*model.ArtifactInfo{failedArtifact}
}
project, err := config.GlobalProjectMgr.Get(prjName)
if err != nil {
log.Errorf("failed to get project %s, error: %v", prjName, err)
return nil, nil, err
}
if project == nil {
return nil, nil, fmt.Errorf("project %s not found of replication event", prjName)
}
return payload, project, nil
}
func getMetadataFromResource(resource string) (namespace, nameAndTag string) {
meta := strings.Split(resource, "/")
return meta[0], meta[1]
}

View File

@ -0,0 +1,285 @@
package artifact
import (
"github.com/goharbor/harbor/src/api/event"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/promgr/metamgr"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/replication"
daoModels "github.com/goharbor/harbor/src/replication/dao/models"
"github.com/goharbor/harbor/src/replication/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)
type fakedNotificationPolicyMgr struct {
}
type fakedReplicationPolicyMgr struct {
}
type fakedReplicationMgr struct {
}
type fakedReplicationRegistryMgr struct {
}
type fakedProjectMgr struct {
}
func (f *fakedNotificationPolicyMgr) Create(*models.NotificationPolicy) (int64, error) {
return 0, nil
}
// List the policies, returns the policy list and error
func (f *fakedNotificationPolicyMgr) List(int64) ([]*models.NotificationPolicy, error) {
return nil, nil
}
// Get policy with specified ID
func (f *fakedNotificationPolicyMgr) Get(int64) (*models.NotificationPolicy, error) {
return nil, nil
}
// GetByNameAndProjectID get policy by the name and projectID
func (f *fakedNotificationPolicyMgr) GetByNameAndProjectID(string, int64) (*models.NotificationPolicy, error) {
return nil, nil
}
// Update the specified policy
func (f *fakedNotificationPolicyMgr) Update(*models.NotificationPolicy) error {
return nil
}
// Delete the specified policy
func (f *fakedNotificationPolicyMgr) Delete(int64) error {
return nil
}
// Test the specified policy
func (f *fakedNotificationPolicyMgr) Test(*models.NotificationPolicy) error {
return nil
}
// GetRelatedPolices get event type related policies in project
func (f *fakedNotificationPolicyMgr) GetRelatedPolices(int64, string) ([]*models.NotificationPolicy, error) {
return []*models.NotificationPolicy{
{
ID: 0,
},
}, nil
}
func (f *fakedReplicationMgr) StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error) {
return 0, nil
}
func (f *fakedReplicationMgr) StopReplication(int64) error {
return nil
}
func (f *fakedReplicationMgr) ListExecutions(...*daoModels.ExecutionQuery) (int64, []*daoModels.Execution, error) {
return 0, nil, nil
}
func (f *fakedReplicationMgr) GetExecution(int64) (*daoModels.Execution, error) {
return &daoModels.Execution{
PolicyID: 1,
Trigger: "manual",
}, nil
}
func (f *fakedReplicationMgr) ListTasks(...*daoModels.TaskQuery) (int64, []*daoModels.Task, error) {
return 0, nil, nil
}
func (f *fakedReplicationMgr) GetTask(int64) (*daoModels.Task, error) {
return &daoModels.Task{
ExecutionID: 1,
SrcResource: "library/alpine:[v1]",
DstResource: "gxt/alpine:[v1] ",
}, nil
}
func (f *fakedReplicationMgr) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error {
return nil
}
func (f *fakedReplicationMgr) GetTaskLog(int64) ([]byte, error) {
return nil, nil
}
// Create new policy
func (f *fakedReplicationPolicyMgr) Create(*model.Policy) (int64, error) {
return 0, nil
}
// List the policies, returns the total count, policy list and error
func (f *fakedReplicationPolicyMgr) List(...*model.PolicyQuery) (int64, []*model.Policy, error) {
return 0, nil, nil
}
// Get policy with specified ID
func (f *fakedReplicationPolicyMgr) Get(int64) (*model.Policy, error) {
return &model.Policy{
ID: 1,
SrcRegistry: &model.Registry{
ID: 0,
},
DestRegistry: &model.Registry{
ID: 0,
},
}, nil
}
// Get policy by the name
func (f *fakedReplicationPolicyMgr) GetByName(string) (*model.Policy, error) {
return nil, nil
}
// Update the specified policy
func (f *fakedReplicationPolicyMgr) Update(policy *model.Policy) error {
return nil
}
// Remove the specified policy
func (f *fakedReplicationPolicyMgr) Remove(int64) error {
return nil
}
// Add new registry
func (f *fakedReplicationRegistryMgr) Add(*model.Registry) (int64, error) {
return 0, nil
}
// List registries, returns total count, registry list and error
func (f *fakedReplicationRegistryMgr) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
// Get the specified registry
func (f *fakedReplicationRegistryMgr) Get(int64) (*model.Registry, error) {
return &model.Registry{
Type: "harbor",
Credential: &model.Credential{
Type: "local",
},
}, nil
}
// GetByName gets registry by name
func (f *fakedReplicationRegistryMgr) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
// Update the registry, the "props" are the properties of registry
// that need to be updated
func (f *fakedReplicationRegistryMgr) Update(registry *model.Registry, props ...string) error {
return nil
}
// Remove the registry with the specified ID
func (f *fakedReplicationRegistryMgr) Remove(int64) error {
return nil
}
// HealthCheck checks health status of all registries and update result in database
func (f *fakedReplicationRegistryMgr) HealthCheck() error {
return nil
}
func (f *fakedProjectMgr) Get(projectIDOrName interface{}) (*models.Project, error) {
return &models.Project{ProjectID: 1}, nil
}
func (f *fakedProjectMgr) Create(*models.Project) (int64, error) {
return 0, nil
}
func (f *fakedProjectMgr) Delete(projectIDOrName interface{}) error {
return nil
}
func (f *fakedProjectMgr) Update(projectIDOrName interface{}, project *models.Project) error {
return nil
}
func (f *fakedProjectMgr) List(query *models.ProjectQueryParam) (*models.ProjectQueryResult, error) {
return nil, nil
}
func (f *fakedProjectMgr) IsPublic(projectIDOrName interface{}) (bool, error) {
return true, nil
}
func (f *fakedProjectMgr) Exists(projectIDOrName interface{}) (bool, error) {
return false, nil
}
// get all public project
func (f *fakedProjectMgr) GetPublic() ([]*models.Project, error) {
return nil, nil
}
// if the project manager uses a metadata manager, return it, otherwise return nil
func (f *fakedProjectMgr) GetMetadataManager() metamgr.ProjectMetadataManager {
return nil
}
func TestReplicationHandler_Handle(t *testing.T) {
config.Init()
PolicyMgr := notification.PolicyMgr
execution := replication.OperationCtl
rpPolicy := replication.PolicyCtl
rpRegistry := replication.RegistryMgr
project := config.GlobalProjectMgr
defer func() {
notification.PolicyMgr = PolicyMgr
replication.OperationCtl = execution
replication.PolicyCtl = rpPolicy
replication.RegistryMgr = rpRegistry
config.GlobalProjectMgr = project
}()
notification.PolicyMgr = &fakedNotificationPolicyMgr{}
replication.OperationCtl = &fakedReplicationMgr{}
replication.PolicyCtl = &fakedReplicationPolicyMgr{}
replication.RegistryMgr = &fakedReplicationRegistryMgr{}
config.GlobalProjectMgr = &fakedProjectMgr{}
handler := &ReplicationHandler{}
type args struct {
data interface{}
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "ImagePreprocessHandler Want Error 1",
args: args{
data: "",
},
wantErr: true,
},
{
name: "ImagePreprocessHandler 1",
args: args{
data: &event.ReplicationEvent{
OccurAt: time.Now(),
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := handler.Handle(tt.args.data)
if tt.wantErr {
require.NotNil(t, err, "Error: %s", err)
return
}
assert.Nil(t, err)
})
}
}
func TestReplicationHandler_IsStateful(t *testing.T) {
handler := &ReplicationHandler{}
assert.False(t, handler.IsStateful())
}

View File

@ -0,0 +1,28 @@
package metadata
import (
"time"
event2 "github.com/goharbor/harbor/src/api/event"
"github.com/goharbor/harbor/src/pkg/notifier/event"
)
// ReplicationMetaData defines replication related event data
type ReplicationMetaData struct {
ReplicationTaskID int64
Status string
}
// Resolve replication metadata into replication event
func (r *ReplicationMetaData) Resolve(evt *event.Event) error {
data := &event2.ReplicationEvent{
ReplicationTaskID: r.ReplicationTaskID,
EventType: event2.TopicReplication,
OccurAt: time.Now(),
Status: r.Status,
}
evt.Topic = event2.TopicReplication
evt.Data = data
return nil
}

View File

@ -44,6 +44,7 @@ const (
TopicUploadChart = "UPLOAD_CHART"
TopicDownloadChart = "DOWNLOAD_CHART"
TopicDeleteChart = "DELETE_CHART"
TopicReplication = "REPLICATION"
)
// CreateProjectEvent is the creating project event
@ -264,3 +265,11 @@ type ImgResource struct {
Digest string
Tag string
}
// ReplicationEvent is replication related event data to publish
type ReplicationEvent struct {
EventType string
ReplicationTaskID int64
OccurAt time.Time
Status string
}

View File

@ -16,12 +16,12 @@ package dao
import (
"fmt"
"github.com/goharbor/harbor/src/common/models"
"net/url"
"os"
"strconv"
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/models"
"github.com/golang-migrate/migrate"
_ "github.com/golang-migrate/migrate/database/postgres" // import pgsql driver for migrator
_ "github.com/golang-migrate/migrate/source/file" // import local file driver for migrator

View File

@ -383,6 +383,7 @@ func initSupportedEvents() map[string]struct{} {
event.TopicQuotaWarning,
event.TopicScanningFailed,
event.TopicScanningCompleted,
event.TopicReplication,
}
var supportedEventTypes = make(map[string]struct{})

View File

@ -154,11 +154,29 @@ func (h *Handler) HandleReplicationScheduleJob() {
// HandleReplicationTask handles the webhook of replication task
func (h *Handler) HandleReplicationTask() {
log.Debugf("received replication task status update event: task-%d, status-%s", h.id, h.status)
if err := hook.UpdateTask(replication.OperationCtl, h.id, h.rawStatus, h.revision); err != nil {
log.Errorf("failed to update the status of the replication task %d: %v", h.id, err)
h.SendInternalServerError(err)
return
}
// Trigger artifict webhook event only for JobFinished and JobError status
if h.status == models.JobFinished || h.status == models.JobError || h.status == models.JobStopped {
e := &event.Event{}
metaData := &metadata.ReplicationMetaData{
ReplicationTaskID: h.id,
Status: h.rawStatus,
}
if err := e.Build(metaData); err == nil {
if err := e.Publish(); err != nil {
log.Error(errors.Wrap(err, "replication job hook handler: event publish"))
}
} else {
log.Error(errors.Wrap(err, "replication job hook handler: event publish"))
}
}
}
// HandleRetentionTask handles the webhook of retention task

View File

@ -41,6 +41,7 @@ func Init() {
JobMgr = jobMgr.NewDefaultManager()
SupportedNotifyTypes = make(map[string]struct{})
initSupportedNotifyType(model.NotifyTypeHTTP, model.NotifyTypeSlack)
log.Info("notification initialization completed")

View File

@ -22,9 +22,10 @@ type Payload struct {
// EventData of notification event payload
type EventData struct {
Resources []*Resource `json:"resources"`
Repository *Repository `json:"repository"`
Custom map[string]string `json:"custom_attributes,omitempty"`
Resources []*Resource `json:"resources,omitempty"`
Repository *Repository `json:"repository,omitempty"`
Replication *Replication `json:"replication,omitempty"`
Custom map[string]string `json:"custom_attributes,omitempty"`
}
// Resource describe infos of resource triggered notification
@ -43,3 +44,37 @@ type Repository struct {
RepoFullName string `json:"repo_full_name"`
RepoType string `json:"repo_type"`
}
// Replication describes replication infos
type Replication struct {
HarborHostname string `json:"harbor_hostname,omitempty"`
JobStatus string `json:"job_status,omitempty"`
Description string `json:"description,omitempty"`
ArtifactType string `json:"artifact_type,omitempty"`
AuthenticationType string `json:"authentication_type,omitempty"`
OverrideMode bool `json:"override_mode,omitempty"`
TriggerType string `json:"trigger_type,omitempty"`
PolicyCreator string `json:"policy_creator,omitempty"`
ExecutionTimestamp int64 `json:"execution_timestamp,omitempty"`
SrcResource *ReplicationResource `json:"src_resource,omitempty"`
DestResource *ReplicationResource `json:"dest_resource,omitempty"`
SuccessfulArtifact []*ArtifactInfo `json:"successful_artifact,omitempty"`
FailedArtifact []*ArtifactInfo `json:"failed_artifact,omitempty"`
}
// ArtifactInfo describe info of artifact replicated
type ArtifactInfo struct {
Type string `json:"type"`
Status string `json:"status"`
NameAndTag string `json:"name_tag"`
FailReason string `json:"fail_reason,omitempty"`
}
// ReplicationResource describes replication resource info
type ReplicationResource struct {
RegistryName string `json:"registry_name,omitempty"`
RegistryType string `json:"registry_type"`
Endpoint string `json:"endpoint"`
Provider string `json:"provider,omitempty"`
Namespace string `json:"namespace,omitempty"`
}