mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-24 09:38:09 +01:00
Merge pull request #21 from lightof410/gxt/webhook-dev-20190807
add webhook job in jobservice
This commit is contained in:
commit
9cb4745ee3
@ -64,6 +64,10 @@ jobservice:
|
||||
# Maximum number of job workers in job service
|
||||
max_job_workers: 10
|
||||
|
||||
notification:
|
||||
# Maximum retry count for webhook job
|
||||
webhook_job_max_retry: 10
|
||||
|
||||
chart:
|
||||
# Change the value of absolute_url to enabled can enable absolute url in chart
|
||||
absolute_url: disabled
|
||||
|
@ -1,3 +1,4 @@
|
||||
CORE_SECRET={{core_secret}}
|
||||
JOBSERVICE_SECRET={{jobservice_secret}}
|
||||
CORE_URL={{core_url}}
|
||||
JOBSERVICE_WEBHOOK_JOB_MAX_RETRY={{notification_webhook_job_max_retry}}
|
||||
|
@ -188,6 +188,9 @@ def parse_yaml_config(config_file_path):
|
||||
config_dict['max_job_workers'] = js_config["max_job_workers"]
|
||||
config_dict['jobservice_secret'] = generate_random_string(16)
|
||||
|
||||
# notification config
|
||||
notification_config = configs.get('notification') or {}
|
||||
config_dict['notification_webhook_job_max_retry'] = notification_config["webhook_job_max_retry"]
|
||||
|
||||
# Log configs
|
||||
allowed_levels = ['debug', 'info', 'warning', 'error', 'fatal']
|
||||
|
99
src/jobservice/job/impl/notification/webhook_job.go
Normal file
99
src/jobservice/job/impl/notification/webhook_job.go
Normal file
@ -0,0 +1,99 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
commonhttp "github.com/goharbor/harbor/src/common/http"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// Max retry has the same meaning as max fails.
|
||||
const maxFails = "JOBSERVICE_WEBHOOK_JOB_MAX_RETRY"
|
||||
|
||||
// WebhookJob implements the job interface, which send notification by http or https.
|
||||
type WebhookJob struct {
|
||||
client *http.Client
|
||||
logger logger.Interface
|
||||
ctx job.Context
|
||||
}
|
||||
|
||||
// MaxFails returns that how many times this job can fail, get this value from ctx.
|
||||
func (wj *WebhookJob) MaxFails() uint {
|
||||
if maxFails, exist := os.LookupEnv(maxFails); exist {
|
||||
result, err := strconv.ParseUint(maxFails, 10, 32)
|
||||
// Unable to log error message because the logger isn't initialized when calling this function.
|
||||
if err == nil {
|
||||
return uint(result)
|
||||
}
|
||||
}
|
||||
|
||||
// Default max fails count is 10, and its max retry interval is around 3h
|
||||
// Large enough to ensure most situations can notify successfully
|
||||
return 10
|
||||
}
|
||||
|
||||
// ShouldRetry ...
|
||||
func (wj *WebhookJob) ShouldRetry() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Validate implements the interface in job/Interface
|
||||
func (wj *WebhookJob) Validate(params job.Parameters) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run implements the interface in job/Interface
|
||||
func (wj *WebhookJob) Run(ctx job.Context, params job.Parameters) error {
|
||||
if err := wj.init(ctx, params); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return wj.execute(ctx, params)
|
||||
}
|
||||
|
||||
// init webhook job
|
||||
func (wj *WebhookJob) init(ctx job.Context, params map[string]interface{}) error {
|
||||
wj.logger = ctx.GetLogger()
|
||||
wj.ctx = ctx
|
||||
|
||||
// default insecureSkipVerify is false
|
||||
insecureSkipVerify := false
|
||||
if v, ok := params["skip_cert_verify"]; ok {
|
||||
insecureSkipVerify = v.(bool)
|
||||
}
|
||||
wj.client = &http.Client{
|
||||
Transport: commonhttp.GetHTTPTransport(insecureSkipVerify),
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// execute webhook job
|
||||
func (wj *WebhookJob) execute(ctx job.Context, params map[string]interface{}) error {
|
||||
payload := params["payload"].(string)
|
||||
address := params["address"].(string)
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, address, bytes.NewReader([]byte(payload)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if v, ok := params["auth_header"]; ok && len(v.(string)) > 0 {
|
||||
req.Header.Set("Authorization", v.(string))
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := wj.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("webhook job(target: %s) response code is %d", address, resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
75
src/jobservice/job/impl/notification/webhook_job_test.go
Normal file
75
src/jobservice/job/impl/notification/webhook_job_test.go
Normal file
@ -0,0 +1,75 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMaxFails(t *testing.T) {
|
||||
rep := &WebhookJob{}
|
||||
// test default max fails
|
||||
assert.Equal(t, uint(10), rep.MaxFails())
|
||||
|
||||
// test user defined max fails
|
||||
_ = os.Setenv(maxFails, "15")
|
||||
assert.Equal(t, uint(15), rep.MaxFails())
|
||||
|
||||
// test user defined wrong max fails
|
||||
_ = os.Setenv(maxFails, "abc")
|
||||
assert.Equal(t, uint(10), rep.MaxFails())
|
||||
}
|
||||
|
||||
func TestShouldRetry(t *testing.T) {
|
||||
rep := &WebhookJob{}
|
||||
assert.True(t, rep.ShouldRetry())
|
||||
}
|
||||
|
||||
func TestValidate(t *testing.T) {
|
||||
rep := &WebhookJob{}
|
||||
assert.Nil(t, rep.Validate(nil))
|
||||
}
|
||||
|
||||
func TestRun(t *testing.T) {
|
||||
rep := &WebhookJob{}
|
||||
|
||||
// test webhook request
|
||||
ts := httptest.NewServer(
|
||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := ioutil.ReadAll(r.Body)
|
||||
|
||||
// test request method
|
||||
assert.Equal(t, http.MethodPost, r.Method)
|
||||
// test request header
|
||||
assert.Equal(t, "auth_test", r.Header.Get("Authorization"))
|
||||
// test request body
|
||||
assert.Equal(t, string(body), `{"key": "value"}`)
|
||||
}))
|
||||
defer ts.Close()
|
||||
params := map[string]interface{}{
|
||||
"skip_cert_verify": true,
|
||||
"payload": `{"key": "value"}`,
|
||||
"address": ts.URL,
|
||||
"auth_header": "auth_test",
|
||||
}
|
||||
// test correct webhook response
|
||||
assert.Nil(t, rep.Run(&impl.Context{}, params))
|
||||
|
||||
tsWrong := httptest.NewServer(
|
||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
}))
|
||||
defer tsWrong.Close()
|
||||
paramsWrong := map[string]interface{}{
|
||||
"skip_cert_verify": true,
|
||||
"payload": `{"key": "value"}`,
|
||||
"address": tsWrong.URL,
|
||||
"auth_header": "auth_test",
|
||||
}
|
||||
// test incorrect webhook response
|
||||
assert.NotNil(t, rep.Run(&impl.Context{}, paramsWrong))
|
||||
}
|
@ -30,6 +30,8 @@ const (
|
||||
Replication = "REPLICATION"
|
||||
// ReplicationScheduler : the name of the replication scheduler job in job service
|
||||
ReplicationScheduler = "IMAGE_REPLICATE"
|
||||
// WebhookJob : the name of the webhook job in job service
|
||||
WebhookJob = "WEBHOOK"
|
||||
// Retention : the name of the retention job
|
||||
Retention = "RETENTION"
|
||||
)
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/jobservice/hook"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl/gc"
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl/notification"
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl/replication"
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl/sample"
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl/scan"
|
||||
@ -248,6 +249,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
|
||||
job.ReplicationScheduler: (*replication.Scheduler)(nil),
|
||||
job.Retention: (*retention.Job)(nil),
|
||||
scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil),
|
||||
job.WebhookJob: (*notification.WebhookJob)(nil),
|
||||
}); err != nil {
|
||||
// exit
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user