add webhook job

Signed-off-by: 王添 <wangtian@corp.netease.com>
This commit is contained in:
王添 2019-08-07 20:56:31 +08:00
parent e7fafd1941
commit 94d4f9c6b6
7 changed files with 186 additions and 0 deletions

View File

@ -64,6 +64,10 @@ jobservice:
# Maximum number of job workers in job service # Maximum number of job workers in job service
max_job_workers: 10 max_job_workers: 10
notification:
# Maximum retry count for webhook job
webhook_job_max_retry: 10
chart: chart:
# Change the value of absolute_url to enabled can enable absolute url in chart # Change the value of absolute_url to enabled can enable absolute url in chart
absolute_url: disabled absolute_url: disabled

View File

@ -1,3 +1,4 @@
CORE_SECRET={{core_secret}} CORE_SECRET={{core_secret}}
JOBSERVICE_SECRET={{jobservice_secret}} JOBSERVICE_SECRET={{jobservice_secret}}
CORE_URL={{core_url}} CORE_URL={{core_url}}
JOBSERVICE_WEBHOOK_JOB_MAX_RETRY={{notification_webhook_job_max_retry}}

View File

@ -188,6 +188,9 @@ def parse_yaml_config(config_file_path):
config_dict['max_job_workers'] = js_config["max_job_workers"] config_dict['max_job_workers'] = js_config["max_job_workers"]
config_dict['jobservice_secret'] = generate_random_string(16) config_dict['jobservice_secret'] = generate_random_string(16)
# notification config
notification_config = configs.get('notification') or {}
config_dict['notification_webhook_job_max_retry'] = notification_config["webhook_job_max_retry"]
# Log configs # Log configs
allowed_levels = ['debug', 'info', 'warning', 'error', 'fatal'] allowed_levels = ['debug', 'info', 'warning', 'error', 'fatal']

View File

@ -0,0 +1,99 @@
package notification
import (
"bytes"
"fmt"
commonhttp "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"net/http"
"os"
"strconv"
)
// Max retry has the same meaning as max fails.
const maxFails = "JOBSERVICE_WEBHOOK_JOB_MAX_RETRY"
// WebhookJob implements the job interface, which send notification by http or https.
type WebhookJob struct {
client *http.Client
logger logger.Interface
ctx job.Context
}
// MaxFails returns that how many times this job can fail, get this value from ctx.
func (wj *WebhookJob) MaxFails() uint {
if maxFails, exist := os.LookupEnv(maxFails); exist {
result, err := strconv.ParseUint(maxFails, 10, 32)
// Unable to log error message because the logger isn't initialized when calling this function.
if err == nil {
return uint(result)
}
}
// Default max fails count is 10, and its max retry interval is around 3h
// Large enough to ensure most situations can notify successfully
return 10
}
// ShouldRetry ...
func (wj *WebhookJob) ShouldRetry() bool {
return true
}
// Validate implements the interface in job/Interface
func (wj *WebhookJob) Validate(params job.Parameters) error {
return nil
}
// Run implements the interface in job/Interface
func (wj *WebhookJob) Run(ctx job.Context, params job.Parameters) error {
if err := wj.init(ctx, params); err != nil {
return err
}
return wj.execute(ctx, params)
}
// init webhook job
func (wj *WebhookJob) init(ctx job.Context, params map[string]interface{}) error {
wj.logger = ctx.GetLogger()
wj.ctx = ctx
// default insecureSkipVerify is false
insecureSkipVerify := false
if v, ok := params["skip_cert_verify"]; ok {
insecureSkipVerify = v.(bool)
}
wj.client = &http.Client{
Transport: commonhttp.GetHTTPTransport(insecureSkipVerify),
}
return nil
}
// execute webhook job
func (wj *WebhookJob) execute(ctx job.Context, params map[string]interface{}) error {
payload := params["payload"].(string)
address := params["address"].(string)
req, err := http.NewRequest(http.MethodPost, address, bytes.NewReader([]byte(payload)))
if err != nil {
return err
}
if v, ok := params["auth_header"]; ok && len(v.(string)) > 0 {
req.Header.Set("Authorization", v.(string))
}
req.Header.Set("Content-Type", "application/json")
resp, err := wj.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("webhook job(target: %s) response code is %d", address, resp.StatusCode)
}
return nil
}

View File

@ -0,0 +1,75 @@
package notification
import (
"github.com/goharbor/harbor/src/jobservice/job/impl"
"github.com/stretchr/testify/assert"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"testing"
)
func TestMaxFails(t *testing.T) {
rep := &WebhookJob{}
// test default max fails
assert.Equal(t, uint(10), rep.MaxFails())
// test user defined max fails
_ = os.Setenv(maxFails, "15")
assert.Equal(t, uint(15), rep.MaxFails())
// test user defined wrong max fails
_ = os.Setenv(maxFails, "abc")
assert.Equal(t, uint(10), rep.MaxFails())
}
func TestShouldRetry(t *testing.T) {
rep := &WebhookJob{}
assert.True(t, rep.ShouldRetry())
}
func TestValidate(t *testing.T) {
rep := &WebhookJob{}
assert.Nil(t, rep.Validate(nil))
}
func TestRun(t *testing.T) {
rep := &WebhookJob{}
// test webhook request
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := ioutil.ReadAll(r.Body)
// test request method
assert.Equal(t, http.MethodPost, r.Method)
// test request header
assert.Equal(t, "auth_test", r.Header.Get("Authorization"))
// test request body
assert.Equal(t, string(body), `{"key": "value"}`)
}))
defer ts.Close()
params := map[string]interface{}{
"skip_cert_verify": true,
"payload": `{"key": "value"}`,
"address": ts.URL,
"auth_header": "auth_test",
}
// test correct webhook response
assert.Nil(t, rep.Run(&impl.Context{}, params))
tsWrong := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
}))
defer tsWrong.Close()
paramsWrong := map[string]interface{}{
"skip_cert_verify": true,
"payload": `{"key": "value"}`,
"address": tsWrong.URL,
"auth_header": "auth_test",
}
// test incorrect webhook response
assert.NotNil(t, rep.Run(&impl.Context{}, paramsWrong))
}

View File

@ -30,6 +30,8 @@ const (
Replication = "REPLICATION" Replication = "REPLICATION"
// ReplicationScheduler : the name of the replication scheduler job in job service // ReplicationScheduler : the name of the replication scheduler job in job service
ReplicationScheduler = "IMAGE_REPLICATE" ReplicationScheduler = "IMAGE_REPLICATE"
// WebhookJob : the name of the webhook job in job service
WebhookJob = "WEBHOOK"
// Retention : the name of the retention job // Retention : the name of the retention job
Retention = "RETENTION" Retention = "RETENTION"
) )

View File

@ -33,6 +33,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/hook" "github.com/goharbor/harbor/src/jobservice/hook"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/job/impl/gc" "github.com/goharbor/harbor/src/jobservice/job/impl/gc"
"github.com/goharbor/harbor/src/jobservice/job/impl/notification"
"github.com/goharbor/harbor/src/jobservice/job/impl/replication" "github.com/goharbor/harbor/src/jobservice/job/impl/replication"
"github.com/goharbor/harbor/src/jobservice/job/impl/sample" "github.com/goharbor/harbor/src/jobservice/job/impl/sample"
"github.com/goharbor/harbor/src/jobservice/job/impl/scan" "github.com/goharbor/harbor/src/jobservice/job/impl/scan"
@ -248,6 +249,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
job.ReplicationScheduler: (*replication.Scheduler)(nil), job.ReplicationScheduler: (*replication.Scheduler)(nil),
job.Retention: (*retention.Job)(nil), job.Retention: (*retention.Job)(nil),
scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil), scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil),
job.WebhookJob: (*notification.WebhookJob)(nil),
}); err != nil { }); err != nil {
// exit // exit
return nil, err return nil, err