diff --git a/src/api/event/handler/init.go b/src/api/event/handler/init.go index b60cfbf31..5362a07bb 100644 --- a/src/api/event/handler/init.go +++ b/src/api/event/handler/init.go @@ -25,6 +25,7 @@ func init() { notifier.Subscribe(event.TopicScanningFailed, &scan.Handler{}) notifier.Subscribe(event.TopicScanningCompleted, &scan.Handler{}) notifier.Subscribe(event.TopicDeleteArtifact, &scan.DelArtHandler{}) + notifier.Subscribe(event.TopicReplication, &artifact.ReplicationHandler{}) // replication notifier.Subscribe(event.TopicPushArtifact, &replication.Handler{}) diff --git a/src/api/event/handler/webhook/artifact/replication.go b/src/api/event/handler/webhook/artifact/replication.go new file mode 100644 index 000000000..4ac9db842 --- /dev/null +++ b/src/api/event/handler/webhook/artifact/replication.go @@ -0,0 +1,206 @@ +package artifact + +import ( + "errors" + "fmt" + "strings" + + "github.com/goharbor/harbor/src/api/event" + "github.com/goharbor/harbor/src/api/event/handler/util" + commonModels "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/pkg/notification" + "github.com/goharbor/harbor/src/pkg/notifier/model" + notifyModel "github.com/goharbor/harbor/src/pkg/notifier/model" + "github.com/goharbor/harbor/src/replication" + rpModel "github.com/goharbor/harbor/src/replication/model" +) + +// ReplicationHandler preprocess replication event data +type ReplicationHandler struct { +} + +// Handle ... +func (r *ReplicationHandler) Handle(value interface{}) error { + if !config.NotificationEnable() { + log.Debug("notification feature is not enabled") + return nil + } + + rpEvent, ok := value.(*event.ReplicationEvent) + if !ok { + return errors.New("invalid replication event type") + } + if rpEvent == nil { + return fmt.Errorf("nil replication event") + } + + payload, project, err := constructReplicationPayload(rpEvent) + if err != nil { + return err + } + + policies, err := notification.PolicyMgr.GetRelatedPolices(project.ProjectID, rpEvent.EventType) + if err != nil { + log.Errorf("failed to find policy for %s event: %v", rpEvent.EventType, err) + return err + } + if len(policies) == 0 { + log.Debugf("cannot find policy for %s event: %v", rpEvent.EventType, rpEvent) + return nil + } + err = util.SendHookWithPolicies(policies, payload, rpEvent.EventType) + if err != nil { + return err + } + return nil +} + +// IsStateful ... +func (r *ReplicationHandler) IsStateful() bool { + return false +} + +func constructReplicationPayload(event *event.ReplicationEvent) (*model.Payload, *commonModels.Project, error) { + task, err := replication.OperationCtl.GetTask(event.ReplicationTaskID) + if err != nil { + log.Errorf("failed to get replication task %d: error: %v", event.ReplicationTaskID, err) + return nil, nil, err + } + if task == nil { + return nil, nil, fmt.Errorf("task %d not found with replication event", event.ReplicationTaskID) + } + + execution, err := replication.OperationCtl.GetExecution(task.ExecutionID) + if err != nil { + log.Errorf("failed to get replication execution %d: error: %v", task.ExecutionID, err) + return nil, nil, err + } + if execution == nil { + return nil, nil, fmt.Errorf("execution %d not found with replication event", task.ExecutionID) + } + + rpPolicy, err := replication.PolicyCtl.Get(execution.PolicyID) + if err != nil { + log.Errorf("failed to get replication policy %d: error: %v", execution.PolicyID, err) + return nil, nil, err + } + if rpPolicy == nil { + return nil, nil, fmt.Errorf("policy %d not found with replication event", execution.PolicyID) + } + + var remoteRegID int64 + if rpPolicy.SrcRegistry != nil && rpPolicy.SrcRegistry.ID > 0 { + remoteRegID = rpPolicy.SrcRegistry.ID + } + + if rpPolicy.DestRegistry != nil && rpPolicy.DestRegistry.ID > 0 { + remoteRegID = rpPolicy.DestRegistry.ID + } + + remoteRegistry, err := replication.RegistryMgr.Get(remoteRegID) + if err != nil { + log.Errorf("failed to get replication remoteRegistry registry %d: error: %v", remoteRegID, err) + return nil, nil, err + } + if remoteRegistry == nil { + return nil, nil, fmt.Errorf("registry %d not found with replication event", remoteRegID) + } + + srcNamespace, srcNameAndTag := getMetadataFromResource(task.SrcResource) + destNamespace, destNameAndTag := getMetadataFromResource(task.DstResource) + + extURL, err := config.ExtURL() + if err != nil { + log.Errorf("Error while reading external endpoint URL: %v", err) + } + hostname := strings.Split(extURL, ":")[0] + + remoteRes := &model.ReplicationResource{ + RegistryName: remoteRegistry.Name, + RegistryType: string(remoteRegistry.Type), + Endpoint: remoteRegistry.URL, + Namespace: srcNamespace, + } + + ext, err := config.ExtEndpoint() + if err != nil { + log.Errorf("Error while reading external endpoint: %v", err) + } + localRes := &model.ReplicationResource{ + RegistryType: string(rpModel.RegistryTypeHarbor), + Endpoint: ext, + Namespace: destNamespace, + } + + payload := ¬ifyModel.Payload{ + Type: event.EventType, + OccurAt: event.OccurAt.Unix(), + Operator: string(execution.Trigger), + EventData: &model.EventData{ + Replication: &model.Replication{ + HarborHostname: hostname, + JobStatus: event.Status, + Description: rpPolicy.Description, + PolicyCreator: rpPolicy.Creator, + ArtifactType: task.ResourceType, + AuthenticationType: string(remoteRegistry.Credential.Type), + OverrideMode: rpPolicy.Override, + TriggerType: string(execution.Trigger), + ExecutionTimestamp: execution.StartTime.Unix(), + }, + }, + } + + var prjName, nameAndTag string + // remote(src) -> local harbor(dest) + if rpPolicy.SrcRegistry != nil { + payload.EventData.Replication.SrcResource = remoteRes + payload.EventData.Replication.DestResource = localRes + prjName = destNamespace + nameAndTag = destNameAndTag + } + + // local harbor(src) -> remote(dest) + if rpPolicy.DestRegistry != nil { + payload.EventData.Replication.DestResource = remoteRes + payload.EventData.Replication.SrcResource = localRes + prjName = srcNamespace + nameAndTag = srcNameAndTag + } + + if event.Status == string(job.SuccessStatus) { + succeedArtifact := &model.ArtifactInfo{ + Type: task.ResourceType, + Status: task.Status, + NameAndTag: nameAndTag, + } + payload.EventData.Replication.SuccessfulArtifact = []*model.ArtifactInfo{succeedArtifact} + } + if event.Status == string(job.ErrorStatus) { + failedArtifact := &model.ArtifactInfo{ + Type: task.ResourceType, + Status: task.Status, + NameAndTag: nameAndTag, + } + payload.EventData.Replication.FailedArtifact = []*model.ArtifactInfo{failedArtifact} + } + + project, err := config.GlobalProjectMgr.Get(prjName) + if err != nil { + log.Errorf("failed to get project %s, error: %v", prjName, err) + return nil, nil, err + } + if project == nil { + return nil, nil, fmt.Errorf("project %s not found of replication event", prjName) + } + + return payload, project, nil +} + +func getMetadataFromResource(resource string) (namespace, nameAndTag string) { + meta := strings.Split(resource, "/") + return meta[0], meta[1] +} diff --git a/src/api/event/handler/webhook/artifact/replication_test.go b/src/api/event/handler/webhook/artifact/replication_test.go new file mode 100644 index 000000000..10006fcd2 --- /dev/null +++ b/src/api/event/handler/webhook/artifact/replication_test.go @@ -0,0 +1,285 @@ +package artifact + +import ( + "github.com/goharbor/harbor/src/api/event" + "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/core/promgr/metamgr" + "github.com/goharbor/harbor/src/pkg/notification" + "github.com/goharbor/harbor/src/replication" + daoModels "github.com/goharbor/harbor/src/replication/dao/models" + "github.com/goharbor/harbor/src/replication/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "testing" + "time" +) + +type fakedNotificationPolicyMgr struct { +} + +type fakedReplicationPolicyMgr struct { +} + +type fakedReplicationMgr struct { +} + +type fakedReplicationRegistryMgr struct { +} + +type fakedProjectMgr struct { +} + +func (f *fakedNotificationPolicyMgr) Create(*models.NotificationPolicy) (int64, error) { + return 0, nil +} + +// List the policies, returns the policy list and error +func (f *fakedNotificationPolicyMgr) List(int64) ([]*models.NotificationPolicy, error) { + return nil, nil +} + +// Get policy with specified ID +func (f *fakedNotificationPolicyMgr) Get(int64) (*models.NotificationPolicy, error) { + return nil, nil +} + +// GetByNameAndProjectID get policy by the name and projectID +func (f *fakedNotificationPolicyMgr) GetByNameAndProjectID(string, int64) (*models.NotificationPolicy, error) { + return nil, nil +} + +// Update the specified policy +func (f *fakedNotificationPolicyMgr) Update(*models.NotificationPolicy) error { + return nil +} + +// Delete the specified policy +func (f *fakedNotificationPolicyMgr) Delete(int64) error { + return nil +} + +// Test the specified policy +func (f *fakedNotificationPolicyMgr) Test(*models.NotificationPolicy) error { + return nil +} + +// GetRelatedPolices get event type related policies in project +func (f *fakedNotificationPolicyMgr) GetRelatedPolices(int64, string) ([]*models.NotificationPolicy, error) { + return []*models.NotificationPolicy{ + { + ID: 0, + }, + }, nil +} + +func (f *fakedReplicationMgr) StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error) { + return 0, nil +} +func (f *fakedReplicationMgr) StopReplication(int64) error { + return nil +} +func (f *fakedReplicationMgr) ListExecutions(...*daoModels.ExecutionQuery) (int64, []*daoModels.Execution, error) { + return 0, nil, nil +} +func (f *fakedReplicationMgr) GetExecution(int64) (*daoModels.Execution, error) { + return &daoModels.Execution{ + PolicyID: 1, + Trigger: "manual", + }, nil +} +func (f *fakedReplicationMgr) ListTasks(...*daoModels.TaskQuery) (int64, []*daoModels.Task, error) { + return 0, nil, nil +} +func (f *fakedReplicationMgr) GetTask(int64) (*daoModels.Task, error) { + return &daoModels.Task{ + ExecutionID: 1, + SrcResource: "library/alpine:[v1]", + DstResource: "gxt/alpine:[v1] ", + }, nil +} +func (f *fakedReplicationMgr) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error { + return nil +} +func (f *fakedReplicationMgr) GetTaskLog(int64) ([]byte, error) { + return nil, nil +} + +// Create new policy +func (f *fakedReplicationPolicyMgr) Create(*model.Policy) (int64, error) { + return 0, nil +} + +// List the policies, returns the total count, policy list and error +func (f *fakedReplicationPolicyMgr) List(...*model.PolicyQuery) (int64, []*model.Policy, error) { + return 0, nil, nil +} + +// Get policy with specified ID +func (f *fakedReplicationPolicyMgr) Get(int64) (*model.Policy, error) { + return &model.Policy{ + ID: 1, + SrcRegistry: &model.Registry{ + ID: 0, + }, + DestRegistry: &model.Registry{ + ID: 0, + }, + }, nil +} + +// Get policy by the name +func (f *fakedReplicationPolicyMgr) GetByName(string) (*model.Policy, error) { + return nil, nil +} + +// Update the specified policy +func (f *fakedReplicationPolicyMgr) Update(policy *model.Policy) error { + return nil +} + +// Remove the specified policy +func (f *fakedReplicationPolicyMgr) Remove(int64) error { + return nil +} + +// Add new registry +func (f *fakedReplicationRegistryMgr) Add(*model.Registry) (int64, error) { + return 0, nil +} + +// List registries, returns total count, registry list and error +func (f *fakedReplicationRegistryMgr) List(...*model.RegistryQuery) (int64, []*model.Registry, error) { + return 0, nil, nil +} + +// Get the specified registry +func (f *fakedReplicationRegistryMgr) Get(int64) (*model.Registry, error) { + return &model.Registry{ + Type: "harbor", + Credential: &model.Credential{ + Type: "local", + }, + }, nil +} + +// GetByName gets registry by name +func (f *fakedReplicationRegistryMgr) GetByName(name string) (*model.Registry, error) { + return nil, nil +} + +// Update the registry, the "props" are the properties of registry +// that need to be updated +func (f *fakedReplicationRegistryMgr) Update(registry *model.Registry, props ...string) error { + return nil +} + +// Remove the registry with the specified ID +func (f *fakedReplicationRegistryMgr) Remove(int64) error { + return nil +} + +// HealthCheck checks health status of all registries and update result in database +func (f *fakedReplicationRegistryMgr) HealthCheck() error { + return nil +} + +func (f *fakedProjectMgr) Get(projectIDOrName interface{}) (*models.Project, error) { + return &models.Project{ProjectID: 1}, nil +} +func (f *fakedProjectMgr) Create(*models.Project) (int64, error) { + return 0, nil +} +func (f *fakedProjectMgr) Delete(projectIDOrName interface{}) error { + return nil +} +func (f *fakedProjectMgr) Update(projectIDOrName interface{}, project *models.Project) error { + return nil +} +func (f *fakedProjectMgr) List(query *models.ProjectQueryParam) (*models.ProjectQueryResult, error) { + return nil, nil +} +func (f *fakedProjectMgr) IsPublic(projectIDOrName interface{}) (bool, error) { + return true, nil +} +func (f *fakedProjectMgr) Exists(projectIDOrName interface{}) (bool, error) { + return false, nil +} + +// get all public project +func (f *fakedProjectMgr) GetPublic() ([]*models.Project, error) { + return nil, nil +} + +// if the project manager uses a metadata manager, return it, otherwise return nil +func (f *fakedProjectMgr) GetMetadataManager() metamgr.ProjectMetadataManager { + return nil +} + +func TestReplicationHandler_Handle(t *testing.T) { + config.Init() + + PolicyMgr := notification.PolicyMgr + execution := replication.OperationCtl + rpPolicy := replication.PolicyCtl + rpRegistry := replication.RegistryMgr + project := config.GlobalProjectMgr + + defer func() { + notification.PolicyMgr = PolicyMgr + replication.OperationCtl = execution + replication.PolicyCtl = rpPolicy + replication.RegistryMgr = rpRegistry + config.GlobalProjectMgr = project + }() + notification.PolicyMgr = &fakedNotificationPolicyMgr{} + replication.OperationCtl = &fakedReplicationMgr{} + replication.PolicyCtl = &fakedReplicationPolicyMgr{} + replication.RegistryMgr = &fakedReplicationRegistryMgr{} + config.GlobalProjectMgr = &fakedProjectMgr{} + + handler := &ReplicationHandler{} + + type args struct { + data interface{} + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "ImagePreprocessHandler Want Error 1", + args: args{ + data: "", + }, + wantErr: true, + }, + { + name: "ImagePreprocessHandler 1", + args: args{ + data: &event.ReplicationEvent{ + OccurAt: time.Now(), + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := handler.Handle(tt.args.data) + if tt.wantErr { + require.NotNil(t, err, "Error: %s", err) + return + } + assert.Nil(t, err) + }) + } + +} + +func TestReplicationHandler_IsStateful(t *testing.T) { + handler := &ReplicationHandler{} + assert.False(t, handler.IsStateful()) +} diff --git a/src/api/event/metadata/replication.go b/src/api/event/metadata/replication.go new file mode 100644 index 000000000..91022c56c --- /dev/null +++ b/src/api/event/metadata/replication.go @@ -0,0 +1,28 @@ +package metadata + +import ( + "time" + + event2 "github.com/goharbor/harbor/src/api/event" + "github.com/goharbor/harbor/src/pkg/notifier/event" +) + +// ReplicationMetaData defines replication related event data +type ReplicationMetaData struct { + ReplicationTaskID int64 + Status string +} + +// Resolve replication metadata into replication event +func (r *ReplicationMetaData) Resolve(evt *event.Event) error { + data := &event2.ReplicationEvent{ + ReplicationTaskID: r.ReplicationTaskID, + EventType: event2.TopicReplication, + OccurAt: time.Now(), + Status: r.Status, + } + + evt.Topic = event2.TopicReplication + evt.Data = data + return nil +} diff --git a/src/api/event/topic.go b/src/api/event/topic.go index e7dc245ff..d9b772433 100644 --- a/src/api/event/topic.go +++ b/src/api/event/topic.go @@ -44,6 +44,7 @@ const ( TopicUploadChart = "UPLOAD_CHART" TopicDownloadChart = "DOWNLOAD_CHART" TopicDeleteChart = "DELETE_CHART" + TopicReplication = "REPLICATION" ) // CreateProjectEvent is the creating project event @@ -264,3 +265,11 @@ type ImgResource struct { Digest string Tag string } + +// ReplicationEvent is replication related event data to publish +type ReplicationEvent struct { + EventType string + ReplicationTaskID int64 + OccurAt time.Time + Status string +} diff --git a/src/common/dao/pgsql.go b/src/common/dao/pgsql.go index 41399d5c1..2e2c8a914 100644 --- a/src/common/dao/pgsql.go +++ b/src/common/dao/pgsql.go @@ -16,12 +16,12 @@ package dao import ( "fmt" - "github.com/goharbor/harbor/src/common/models" "net/url" "os" "strconv" "github.com/astaxie/beego/orm" + "github.com/goharbor/harbor/src/common/models" "github.com/golang-migrate/migrate" _ "github.com/golang-migrate/migrate/database/postgres" // import pgsql driver for migrator _ "github.com/golang-migrate/migrate/source/file" // import local file driver for migrator diff --git a/src/core/api/notification_policy.go b/src/core/api/notification_policy.go index cae117859..23057bde7 100755 --- a/src/core/api/notification_policy.go +++ b/src/core/api/notification_policy.go @@ -383,6 +383,7 @@ func initSupportedEvents() map[string]struct{} { event.TopicQuotaWarning, event.TopicScanningFailed, event.TopicScanningCompleted, + event.TopicReplication, } var supportedEventTypes = make(map[string]struct{}) diff --git a/src/core/service/notifications/jobs/handler.go b/src/core/service/notifications/jobs/handler.go index 2e5cbac77..a717235bc 100755 --- a/src/core/service/notifications/jobs/handler.go +++ b/src/core/service/notifications/jobs/handler.go @@ -154,11 +154,29 @@ func (h *Handler) HandleReplicationScheduleJob() { // HandleReplicationTask handles the webhook of replication task func (h *Handler) HandleReplicationTask() { log.Debugf("received replication task status update event: task-%d, status-%s", h.id, h.status) + if err := hook.UpdateTask(replication.OperationCtl, h.id, h.rawStatus, h.revision); err != nil { log.Errorf("failed to update the status of the replication task %d: %v", h.id, err) h.SendInternalServerError(err) return } + + // Trigger artifict webhook event only for JobFinished and JobError status + if h.status == models.JobFinished || h.status == models.JobError || h.status == models.JobStopped { + e := &event.Event{} + metaData := &metadata.ReplicationMetaData{ + ReplicationTaskID: h.id, + Status: h.rawStatus, + } + + if err := e.Build(metaData); err == nil { + if err := e.Publish(); err != nil { + log.Error(errors.Wrap(err, "replication job hook handler: event publish")) + } + } else { + log.Error(errors.Wrap(err, "replication job hook handler: event publish")) + } + } } // HandleRetentionTask handles the webhook of retention task diff --git a/src/pkg/notification/notification.go b/src/pkg/notification/notification.go index db658ab84..de66726c5 100755 --- a/src/pkg/notification/notification.go +++ b/src/pkg/notification/notification.go @@ -41,6 +41,7 @@ func Init() { JobMgr = jobMgr.NewDefaultManager() SupportedNotifyTypes = make(map[string]struct{}) + initSupportedNotifyType(model.NotifyTypeHTTP, model.NotifyTypeSlack) log.Info("notification initialization completed") diff --git a/src/pkg/notifier/model/event.go b/src/pkg/notifier/model/event.go index 84f8a9a87..a1d63ba1d 100755 --- a/src/pkg/notifier/model/event.go +++ b/src/pkg/notifier/model/event.go @@ -22,9 +22,10 @@ type Payload struct { // EventData of notification event payload type EventData struct { - Resources []*Resource `json:"resources"` - Repository *Repository `json:"repository"` - Custom map[string]string `json:"custom_attributes,omitempty"` + Resources []*Resource `json:"resources,omitempty"` + Repository *Repository `json:"repository,omitempty"` + Replication *Replication `json:"replication,omitempty"` + Custom map[string]string `json:"custom_attributes,omitempty"` } // Resource describe infos of resource triggered notification @@ -43,3 +44,37 @@ type Repository struct { RepoFullName string `json:"repo_full_name"` RepoType string `json:"repo_type"` } + +// Replication describes replication infos +type Replication struct { + HarborHostname string `json:"harbor_hostname,omitempty"` + JobStatus string `json:"job_status,omitempty"` + Description string `json:"description,omitempty"` + ArtifactType string `json:"artifact_type,omitempty"` + AuthenticationType string `json:"authentication_type,omitempty"` + OverrideMode bool `json:"override_mode,omitempty"` + TriggerType string `json:"trigger_type,omitempty"` + PolicyCreator string `json:"policy_creator,omitempty"` + ExecutionTimestamp int64 `json:"execution_timestamp,omitempty"` + SrcResource *ReplicationResource `json:"src_resource,omitempty"` + DestResource *ReplicationResource `json:"dest_resource,omitempty"` + SuccessfulArtifact []*ArtifactInfo `json:"successful_artifact,omitempty"` + FailedArtifact []*ArtifactInfo `json:"failed_artifact,omitempty"` +} + +// ArtifactInfo describe info of artifact replicated +type ArtifactInfo struct { + Type string `json:"type"` + Status string `json:"status"` + NameAndTag string `json:"name_tag"` + FailReason string `json:"fail_reason,omitempty"` +} + +// ReplicationResource describes replication resource info +type ReplicationResource struct { + RegistryName string `json:"registry_name,omitempty"` + RegistryType string `json:"registry_type"` + Endpoint string `json:"endpoint"` + Provider string `json:"provider,omitempty"` + Namespace string `json:"namespace,omitempty"` +}