This commit is contained in:
Akshat 2024-04-30 11:42:08 +02:00 committed by GitHub
commit 0ad9864141
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 535 additions and 10 deletions

View File

@ -31,7 +31,7 @@ var (
Ctl = NewController()
// webhookJobVendors represents webhook(http) or slack.
webhookJobVendors = q.NewOrList([]interface{}{job.WebhookJobVendorType, job.SlackJobVendorType})
webhookJobVendors = q.NewOrList([]interface{}{job.WebhookJobVendorType, job.SlackJobVendorType, job.TeamsJobVendorType})
)
type Controller interface {
@ -103,13 +103,16 @@ func (c *controller) UpdatePolicy(ctx context.Context, policy *model.Policy) err
func (c *controller) DeletePolicy(ctx context.Context, policyID int64) error {
// delete executions under the webhook policy,
// there are two vendor types(webhook & slack) needs to be deleted.
// there are three vendor types(webhook, slack & teams) needs to be deleted.
if err := c.execMgr.DeleteByVendor(ctx, job.WebhookJobVendorType, policyID); err != nil {
return errors.Wrapf(err, "failed to delete executions for webhook of policy %d", policyID)
}
if err := c.execMgr.DeleteByVendor(ctx, job.SlackJobVendorType, policyID); err != nil {
return errors.Wrapf(err, "failed to delete executions for slack of policy %d", policyID)
}
if err := c.execMgr.DeleteByVendor(ctx, job.TeamsJobVendorType, policyID); err != nil {
return errors.Wrapf(err, "failed to delete executions for teams of policy %d", policyID)
}
return c.policyMgr.Delete(ctx, policyID)
}

View File

@ -99,6 +99,12 @@ func (c *controllerTestSuite) TestDeletePolicy() {
err = c.ctl.DeletePolicy(context.TODO(), 1)
c.ErrorIs(err, delExecErr)
// failed to delete policy due to teams executions deletion error
c.execMgr.On("DeleteByVendor", mock.Anything, "WEBHOOK", mock.Anything).Return(nil).Once()
c.execMgr.On("DeleteByVendor", mock.Anything, "TEAMS", mock.Anything).Return(delExecErr).Once()
err = c.ctl.DeletePolicy(context.TODO(), 1)
c.ErrorIs(err, delExecErr)
// successfully deletion for all
c.execMgr.On("DeleteByVendor", mock.Anything, mock.Anything, mock.Anything).Return(nil)
c.policyMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)

View File

@ -0,0 +1,151 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notification
import (
"bytes"
"io"
"net/http"
"os"
"reflect"
"strconv"
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/errors"
)
// TeamsJob implements the job interface, which send notification to teams by teams incoming webhooks.
type TeamsJob struct {
client *http.Client
logger logger.Interface
}
// MaxFails returns that how many times this job can fail.
func (tj *TeamsJob) MaxFails() (result uint) {
// Default max fails count is 3
result = 10
if maxFails, exist := os.LookupEnv(maxFails); exist {
mf, err := strconv.ParseUint(maxFails, 10, 32)
if err != nil {
logger.Warningf("Fetch teams job maxFails error: %s", err.Error())
return result
}
result = uint(mf)
}
return result
}
// MaxCurrency is implementation of same method in Interface.
func (tj *TeamsJob) MaxCurrency() uint {
return 1
}
// ShouldRetry ...
func (tj *TeamsJob) ShouldRetry() bool {
return true
}
// Validate implements the interface in job/Interface
func (tj *TeamsJob) Validate(params job.Parameters) error {
if params == nil {
// Params are required
return errors.New("missing parameter of teams job")
}
payload, ok := params["payload"]
if !ok {
return errors.Errorf("missing job parameter 'payload'")
}
_, ok = payload.(string)
if !ok {
return errors.Errorf("malformed job parameter 'payload', expecting string but got %s", reflect.TypeOf(payload).String())
}
address, ok := params["address"]
if !ok {
return errors.Errorf("missing job parameter 'address'")
}
_, ok = address.(string)
if !ok {
return errors.Errorf("malformed job parameter 'address', expecting string but got %s", reflect.TypeOf(address).String())
}
return nil
}
// Run implements the interface in job/Interface
func (tj *TeamsJob) Run(ctx job.Context, params job.Parameters) error {
if err := tj.init(ctx, params); err != nil {
return err
}
tj.logger.Info("start to run teams job")
err := tj.execute(params)
if err != nil {
tj.logger.Error(err)
} else {
tj.logger.Info("success to run teams job")
}
// Wait a second for teams rate limit, refer to https://learn.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/connectors-using?tabs=cURL#rate-limiting-for-connectors
time.Sleep(time.Second)
return err
}
// init teams job
func (tj *TeamsJob) init(ctx job.Context, params map[string]interface{}) error {
tj.logger = ctx.GetLogger()
// default use secure transport
tj.client = httpHelper.clients[secure]
if v, ok := params["skip_cert_verify"]; ok {
if skipCertVerify, ok := v.(bool); ok && skipCertVerify {
// if skip cert verify is true, it means not verify remote cert, use insecure client
tj.client = httpHelper.clients[insecure]
}
}
return nil
}
// execute teams job
func (tj *TeamsJob) execute(params map[string]interface{}) error {
payload := params["payload"].(string)
address := params["address"].(string)
req, err := http.NewRequest(http.MethodPost, address, bytes.NewReader([]byte(payload)))
if err != nil {
return errors.Wrap(err, "error to generate request")
}
req.Header.Set("Content-Type", "application/json")
tj.logger.Infof("send request to remote endpoint, body: %s", payload)
resp, err := tj.client.Do(req)
if err != nil {
return errors.Wrap(err, "error to send request")
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, err := io.ReadAll(resp.Body)
if err != nil {
tj.logger.Errorf("error to read response body, error: %s", err)
}
return errors.Errorf("abnormal response code: %d, body: %s", resp.StatusCode, string(body))
}
return nil
}

View File

@ -0,0 +1,87 @@
package notification
import (
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
"github.com/goharbor/harbor/src/jobservice/job"
mockjobservice "github.com/goharbor/harbor/src/testing/jobservice"
)
func TestTeamsJobMaxFails(t *testing.T) {
rep := &TeamsJob{}
t.Run("default max fails", func(t *testing.T) {
assert.Equal(t, uint(3), rep.MaxFails())
})
t.Run("user defined max fails", func(t *testing.T) {
t.Setenv(maxFails, "15")
assert.Equal(t, uint(15), rep.MaxFails())
})
t.Run("user defined wrong max fails", func(t *testing.T) {
t.Setenv(maxFails, "abc")
assert.Equal(t, uint(3), rep.MaxFails())
})
}
func TestTeamsJobShouldRetry(t *testing.T) {
rep := &TeamsJob{}
assert.True(t, rep.ShouldRetry())
}
func TestTeamsJobValidate(t *testing.T) {
rep := &TeamsJob{}
assert.NotNil(t, rep.Validate(nil))
jp := job.Parameters{
"address": "https://mydomain.webhook.office.com/webhookb2/akshat123/IncomingWebhook/akshat456/akshat789",
"payload": "teams payload",
}
assert.Nil(t, rep.Validate(jp))
}
func TestTeamsJobRun(t *testing.T) {
ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger)
rep := &TeamsJob{}
// test teams request
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
// test request method
assert.Equal(t, http.MethodPost, r.Method)
// test request body
assert.Equal(t, string(body), `{"key": "value"}`)
}))
defer ts.Close()
params := map[string]interface{}{
"skip_cert_verify": true,
"payload": `{"key": "value"}`,
"address": ts.URL,
}
// test correct teams response
assert.Nil(t, rep.Run(ctx, params))
tsWrong := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
}))
defer tsWrong.Close()
paramsWrong := map[string]interface{}{
"skip_cert_verify": true,
"payload": `{"key": "value"}`,
"address": tsWrong.URL,
}
// test incorrect teams response
assert.NotNil(t, rep.Run(ctx, paramsWrong))
}

View File

@ -30,6 +30,8 @@ const (
WebhookJobVendorType = "WEBHOOK"
// SlackJobVendorType : the name of the slack job in job service
SlackJobVendorType = "SLACK"
// TeamsJobVendorType : the name of the teams job in job service
TeamsJobVendorType = "TEAMS"
// RetentionVendorType : the name of the retention job
RetentionVendorType = "RETENTION"
// P2PPreheatVendorType : the name of the P2P preheat job
@ -57,6 +59,7 @@ var (
ExecSweepVendorType: 10,
GarbageCollectionVendorType: 50,
SlackJobVendorType: 50,
TeamsJobVendorType: 50,
WebhookJobVendorType: 50,
ReplicationVendorType: 50,
ScanDataExportVendorType: 50,

View File

@ -44,6 +44,8 @@ func (ps *defaultSampler) For(job string) uint {
return 1
case SlackJobVendorType:
return 1
case TeamsJobVendorType:
return 1
// add more cases here if specified job priority is required
// case XXX:
// return 2000

View File

@ -50,4 +50,7 @@ func (suite *PrioritySamplerSuite) Test() {
p4 := suite.sampler.For(SlackJobVendorType)
suite.Equal((uint)(1), p4, "Job priority for %s", SlackJobVendorType)
p5 := suite.sampler.For(TeamsJobVendorType)
suite.Equal((uint)(1), p5, "Job priority for %s", TeamsJobVendorType)
}

View File

@ -324,6 +324,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil),
job.WebhookJobVendorType: (*notification.WebhookJob)(nil),
job.SlackJobVendorType: (*notification.SlackJob)(nil),
job.TeamsJobVendorType: (*notification.TeamsJob)(nil),
job.P2PPreheatVendorType: (*preheat.Job)(nil),
job.ScanDataExportVendorType: (*scandataexport.ScanDataExport)(nil),
// In v2.2 we migrate the scheduled replication, garbage collection and scan all to

View File

@ -52,6 +52,8 @@ func (hm *DefaultManager) StartHook(ctx context.Context, event *model.HookEvent,
vendorType = job.WebhookJobVendorType
case model.NotifyTypeSlack:
vendorType = job.SlackJobVendorType
case model.NotifyTypeTeams:
vendorType = job.TeamsJobVendorType
}
if len(vendorType) == 0 {

View File

@ -97,7 +97,7 @@ func initSupportedNotifyType() {
supportedEventTypes = append(supportedEventTypes, EventType(eventType))
}
notifyTypes := []string{notifier_model.NotifyTypeHTTP, notifier_model.NotifyTypeSlack}
notifyTypes := []string{notifier_model.NotifyTypeHTTP, notifier_model.NotifyTypeSlack, notifier_model.NotifyTypeTeams}
for _, notifyType := range notifyTypes {
supportedNotifyTypes = append(supportedNotifyTypes, NotifyType(notifyType))
}

View File

@ -0,0 +1,144 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notification
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"text/template"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/notifier/model"
)
const (
// TeamsBodyTemplate defines Teams request body template
TeamsBodyTemplate = `{
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"contentUrl": null,
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.4",
"body": [
{
"type": "TextBlock",
"text": "**Harbor webhook events**"
},
{
"type": "TextBlock",
"text": "**event_type:** {{.Type}}"
},
{
"type": "TextBlock",
"text": "**occur_at:** {{.OccurAt}}"
},
{
"type": "TextBlock",
"text": "**operator:** {{.Operator}}"
},
{
"type": "TextBlock",
"text": "**event_data:**"
},
{
"type": "TextBlock",
"text": "{{.EventData}}",
"wrap": true
}
]
}
}
]
}`
)
// TeamsHandler preprocess event data to teams and start the hook processing
type TeamsHandler struct {
}
// Name ...
func (s *TeamsHandler) Name() string {
return "Teams"
}
// Handle handles event to teams
func (s *TeamsHandler) Handle(ctx context.Context, value interface{}) error {
if value == nil {
return errors.New("TeamsHandler cannot handle nil value")
}
event, ok := value.(*model.HookEvent)
if !ok || event == nil {
return errors.New("invalid notification teams event")
}
return s.process(ctx, event)
}
// IsStateful ...
func (s *TeamsHandler) IsStateful() bool {
return false
}
func (s *TeamsHandler) process(ctx context.Context, event *model.HookEvent) error {
j := &models.JobData{
Metadata: &models.JobMetadata{
JobKind: job.KindGeneric,
},
}
// Create a teamsJob to send message to teams
j.Name = job.TeamsJobVendorType
// Convert payload to teams format
payload, err := s.convert(event.Payload)
if err != nil {
return fmt.Errorf("convert payload to teams body failed: %v", err)
}
j.Parameters = map[string]interface{}{
"payload": payload,
"address": event.Target.Address,
"skip_cert_verify": event.Target.SkipCertVerify,
}
return notification.HookManager.StartHook(ctx, event, j)
}
func (s *TeamsHandler) convert(payLoad *model.Payload) (string, error) {
data := make(map[string]interface{})
data["Type"] = payLoad.Type
data["OccurAt"] = payLoad.OccurAt
data["Operator"] = payLoad.Operator
eventData, err := json.MarshalIndent(payLoad.EventData, "", "\t")
if err != nil {
return "", fmt.Errorf("marshal from eventData %v failed: %v", payLoad.EventData, err)
}
data["EventData"] = escapeEventData(string(eventData))
tt, _ := template.New("teams").Parse(TeamsBodyTemplate)
var teamsBuf bytes.Buffer
if err := tt.Execute(&teamsBuf, data); err != nil {
return "", fmt.Errorf("%v", err)
}
return teamsBuf.String(), nil
}

View File

@ -0,0 +1,107 @@
package notification
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/goharbor/harbor/src/pkg/notification"
policy_model "github.com/goharbor/harbor/src/pkg/notification/policy/model"
"github.com/goharbor/harbor/src/pkg/notifier/event"
"github.com/goharbor/harbor/src/pkg/notifier/model"
)
func TestTeamsHandler_Handle(t *testing.T) {
hookMgr := notification.HookManager
defer func() {
notification.HookManager = hookMgr
}()
notification.HookManager = &fakedHookManager{}
handler := &TeamsHandler{}
type args struct {
event *event.Event
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "TeamsHandler_Handle Want Error 1",
args: args{
event: &event.Event{
Topic: "teams",
Data: nil,
},
},
wantErr: true,
},
{
name: "TeamsHandler_Handle Want Error 2",
args: args{
event: &event.Event{
Topic: "teams",
Data: &model.EventData{},
},
},
wantErr: true,
},
{
name: "TeamsHandler_Handle 1",
args: args{
event: &event.Event{
Topic: "teams",
Data: &model.HookEvent{
PolicyID: 1,
EventType: "pushImage",
Target: &policy_model.EventTarget{
Type: "teams",
Address: "http://127.0.0.1:8080",
},
Payload: &model.Payload{
OccurAt: time.Now().Unix(),
Type: "pushImage",
Operator: "admin",
EventData: &model.EventData{
Resources: []*model.Resource{
{
Tag: "v9.0",
},
},
Repository: &model.Repository{
Name: "library/debian",
},
},
},
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := handler.Handle(context.TODO(), tt.args.event.Data)
if tt.wantErr {
require.NotNil(t, err, "Error: %s", err)
return
}
})
}
}
func TestTeamsHandler_IsStateful(t *testing.T) {
handler := &TeamsHandler{}
assert.False(t, handler.IsStateful())
}
func TestTeamsHandler_Name(t *testing.T) {
handler := &TeamsHandler{}
assert.Equal(t, "Teams", handler.Name())
}

View File

@ -18,4 +18,5 @@ package model
const (
NotifyTypeHTTP = "http"
NotifyTypeSlack = "slack"
NotifyTypeTeams = "teams"
)

View File

@ -20,6 +20,8 @@ const (
WebhookTopic = "http"
// SlackTopic is topic for sending slack payload
SlackTopic = "slack"
// TeamsTopic is topic for sending teams payload
TeamsTopic = "teams"
// EmailTopic is topic for sending email payload
EmailTopic = "email"
)

View File

@ -26,6 +26,7 @@ func init() {
handlersMap := map[string][]notifier.NotificationHandler{
model.WebhookTopic: {&notification.HTTPHandler{}},
model.SlackTopic: {&notification.SlackHandler{}},
model.TeamsTopic: {&notification.TeamsHandler{}},
}
for t, handlers := range handlersMap {

View File

@ -81,7 +81,7 @@ describe('AddWebhookFormComponent', () => {
'pushImage',
'deleteImage',
],
notify_type: ['http', 'slack'],
notify_type: ['http', 'slack', 'teams'],
};
beforeEach(async () => {

View File

@ -132,7 +132,7 @@ export class AddWebhookFormComponent implements OnInit, OnDestroy {
add() {
this.submitting = true;
if (this.webhook?.targets[0]?.type === WebhookType.SLACK) {
if (this.webhook?.targets[0]?.type === WebhookType.SLACK || this.webhook?.targets[0]?.type === WebhookType.TEAMS) {
delete this.webhook?.targets[0]?.payload_format;
}
this.webhookService
@ -155,7 +155,7 @@ export class AddWebhookFormComponent implements OnInit, OnDestroy {
save() {
this.submitting = true;
if (this.webhook?.targets[0]?.type === WebhookType.SLACK) {
if (this.webhook?.targets[0]?.type === WebhookType.SLACK || this.webhook?.targets[0]?.type === WebhookType.TEAMS) {
delete this.webhook?.targets[0]?.payload_format;
}
this.webhookService

View File

@ -37,7 +37,7 @@ describe('WebhookComponent', () => {
'pushImage',
'deleteImage',
],
notify_type: ['http', 'slack'],
notify_type: ['http', 'slack', 'teams'],
};
const mockedWehook: WebhookPolicy = {
id: 1,

View File

@ -40,11 +40,13 @@ export const PAYLOAD_FORMAT_I18N_MAP = {
export enum WebhookType {
HTTP = 'http',
SLACK = 'slack',
TEAMS = 'teams',
}
export enum VendorType {
WEBHOOK = 'WEBHOOK',
SLACK = 'SLACK',
TEAMS = 'TEAMS',
}
@Injectable()

View File

@ -43,6 +43,8 @@ func (n *WebhookJob) ToSwagger() *models.WebhookJob {
notifyType = "http"
} else if n.VendorType == job.SlackJobVendorType {
notifyType = "slack"
} else if n.VendorType == job.TeamsJobVendorType {
notifyType = "teams"
}
webhookJob.NotifyType = notifyType

View File

@ -80,7 +80,7 @@ func (n *webhookAPI) requireExecutionInPolicy(ctx context.Context, execID, polic
return err
}
if exec.VendorID == policyID && (exec.VendorType == job.WebhookJobVendorType || exec.VendorType == job.SlackJobVendorType) {
if exec.VendorID == policyID && (exec.VendorType == job.WebhookJobVendorType || exec.VendorType == job.SlackJobVendorType || exec.VendorType == job.TeamsJobVendorType) {
return nil
}
@ -423,6 +423,10 @@ func (n *webhookAPI) validateTargets(policy *policy_model.Policy) (bool, error)
return false, errors.New(nil).WithMessage("set payload format is not allowed for slack").WithCode(errors.BadRequestCode)
}
if len(target.PayloadFormat) > 0 && target.Type == "teams" {
return false, errors.New(nil).WithMessage("set payload format is not allowed for teams").WithCode(errors.BadRequestCode)
}
if len(target.PayloadFormat) > 0 && !isPayloadFormatSupported(target.PayloadFormat) {
return false, errors.New(nil).WithMessage("unsupported payload format type: %s", target.PayloadFormat).WithCode(errors.BadRequestCode)
}

View File

@ -57,7 +57,8 @@ func (suite *WebhookJobTestSuite) TestListWebhookJobs() {
suite.webhookCtl.On("CountExecutions", mock.Anything, policyID, mock.Anything).Return(int64(2), nil)
t1 := &task.Execution{ID: 1, VendorType: "WEBHOOK", VendorID: policyID, Status: "Success"}
t2 := &task.Execution{ID: 2, VendorType: "SLACK", VendorID: policyID, Status: "Stopped"}
suite.webhookCtl.On("ListExecutions", mock.Anything, policyID, mock.Anything).Return([]*task.Execution{t1, t2}, nil)
t3 := &task.Execution{ID: 2, VendorType: "TEAMS", VendorID: policyID, Status: "Stopped"}
suite.webhookCtl.On("ListExecutions", mock.Anything, policyID, mock.Anything).Return([]*task.Execution{t1, t2, t3}, nil)
{
// query has no policy id should got 422
@ -85,7 +86,7 @@ func (suite *WebhookJobTestSuite) TestListWebhookJobs() {
resp, err := suite.GetJSON(url, &body)
suite.NoError(err)
suite.Equal(200, resp.StatusCode)
suite.Len(body, 2)
suite.Len(body, 3)
// verify backward compatible
suite.Equal(body[0].ID, int64(1))
suite.Equal(body[0].NotifyType, "http")
@ -93,6 +94,9 @@ func (suite *WebhookJobTestSuite) TestListWebhookJobs() {
suite.Equal(body[1].ID, int64(2))
suite.Equal(body[1].NotifyType, "slack")
suite.Equal(body[1].Status, "Stopped")
suite.Equal(body[2].ID, int64(3))
suite.Equal(body[2].NotifyType, "teams")
suite.Equal(body[2].Status, "Stopped")
}
}