mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-01 21:47:57 +01:00
Merge branch 'job_service' into add_ut_period
This commit is contained in:
commit
e454456de2
@ -28,11 +28,13 @@ env:
|
||||
AUTH_MODE: db_auth
|
||||
SELF_REGISTRATION: on
|
||||
KEY_PATH: /data/secretkey
|
||||
REDIS_HOST: localhost
|
||||
|
||||
before_install:
|
||||
- sudo ./tests/hostcfg.sh
|
||||
- sudo ./tests/generateCerts.sh
|
||||
- sudo ./make/prepare
|
||||
- sudo mkdir -p "/data/redis"
|
||||
|
||||
install:
|
||||
- sudo apt-get update && sudo apt-get install -y libldap2-dev
|
||||
@ -114,6 +116,7 @@ script:
|
||||
- ./tests/swaggerchecker.sh
|
||||
- ./tests/startuptest.sh
|
||||
- ./tests/userlogintest.sh ${HARBOR_ADMIN} ${HARBOR_ADMIN_PASSWD}
|
||||
- export REDIS_HOST=$IP
|
||||
|
||||
# - sudo ./tests/testprepare.sh
|
||||
# - go test -v ./tests/apitests
|
||||
|
@ -16,7 +16,8 @@ import (
|
||||
type Client interface {
|
||||
SubmitJob(*models.JobData) (string, error)
|
||||
GetJobLog(uuid string) ([]byte, error)
|
||||
//TODO actions or stop? Redirect joblog when we see there's memory issue.
|
||||
PostAction(uuid, action string) error
|
||||
//TODO Redirect joblog when we see there's memory issue.
|
||||
}
|
||||
|
||||
// DefaultClient is the default implementation of Client interface
|
||||
@ -101,3 +102,14 @@ func (d *DefaultClient) GetJobLog(uuid string) ([]byte, error) {
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// PostAction call jobservice's API to operate action for job specified by uuid
|
||||
func (d *DefaultClient) PostAction(uuid, action string) error {
|
||||
url := d.endpoint + "/api/v1/jobs/" + uuid
|
||||
req := struct {
|
||||
Action string `json:"action"`
|
||||
}{
|
||||
Action: action,
|
||||
}
|
||||
return d.client.Post(url, req)
|
||||
}
|
||||
|
@ -31,4 +31,7 @@ const (
|
||||
JobServiceStatusSuccess = "Success"
|
||||
//JobServiceStatusScheduled : job status scheduled
|
||||
JobServiceStatusScheduled = "Scheduled"
|
||||
|
||||
// JobActionStop : the action to stop the job
|
||||
JobActionStop = "stop"
|
||||
)
|
||||
|
@ -31,6 +31,7 @@ var client = &http.Client{
|
||||
func TestLaunchJobFailed(t *testing.T) {
|
||||
server, port, ctx := createServer()
|
||||
server.Start()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
resData, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs", port), createJobReq(false))
|
||||
if e := expectFormatedError(resData, err); e != nil {
|
||||
@ -44,6 +45,7 @@ func TestLaunchJobFailed(t *testing.T) {
|
||||
func TestLaunchJobSucceed(t *testing.T) {
|
||||
server, port, ctx := createServer()
|
||||
server.Start()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
res, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs", port), createJobReq(true))
|
||||
if err != nil {
|
||||
@ -64,6 +66,7 @@ func TestLaunchJobSucceed(t *testing.T) {
|
||||
func TestGetJobFailed(t *testing.T) {
|
||||
server, port, ctx := createServer()
|
||||
server.Start()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port))
|
||||
if e := expectFormatedError(res, err); e != nil {
|
||||
@ -77,6 +80,7 @@ func TestGetJobFailed(t *testing.T) {
|
||||
func TestGetJobSucceed(t *testing.T) {
|
||||
server, port, ctx := createServer()
|
||||
server.Start()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port))
|
||||
if err != nil {
|
||||
@ -97,6 +101,7 @@ func TestGetJobSucceed(t *testing.T) {
|
||||
func TestJobActionFailed(t *testing.T) {
|
||||
server, port, ctx := createServer()
|
||||
server.Start()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
actionReq, err := createJobActionReq("stop")
|
||||
if err != nil {
|
||||
@ -126,6 +131,7 @@ func TestJobActionFailed(t *testing.T) {
|
||||
func TestJobActionSucceed(t *testing.T) {
|
||||
server, port, ctx := createServer()
|
||||
server.Start()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
actionReq, err := createJobActionReq("stop")
|
||||
if err != nil {
|
||||
@ -161,6 +167,7 @@ func TestJobActionSucceed(t *testing.T) {
|
||||
func TestCheckStatus(t *testing.T) {
|
||||
server, port, ctx := createServer()
|
||||
server.Start()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/stats", port))
|
||||
if err != nil {
|
||||
@ -186,6 +193,7 @@ func TestCheckStatus(t *testing.T) {
|
||||
func TestGetJobLog(t *testing.T) {
|
||||
server, port, ctx := createServer()
|
||||
server.Start()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok/log", port))
|
||||
if err != nil {
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/errs"
|
||||
@ -55,12 +56,15 @@ type RedisJobStatsManager struct {
|
||||
stopChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
processChan chan *queueItem
|
||||
isRunning bool //no need to sync
|
||||
isRunning *atomic.Value
|
||||
hookStore *HookStore //cache the hook here to avoid requesting backend
|
||||
}
|
||||
|
||||
//NewRedisJobStatsManager is constructor of RedisJobStatsManager
|
||||
func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisJobStatsManager {
|
||||
isRunning := &atomic.Value{}
|
||||
isRunning.Store(false)
|
||||
|
||||
return &RedisJobStatsManager{
|
||||
namespace: namespace,
|
||||
context: ctx,
|
||||
@ -69,16 +73,17 @@ func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *r
|
||||
doneChan: make(chan struct{}, 1),
|
||||
processChan: make(chan *queueItem, processBufferSize),
|
||||
hookStore: NewHookStore(),
|
||||
isRunning: isRunning,
|
||||
}
|
||||
}
|
||||
|
||||
//Start is implementation of same method in JobStatsManager interface.
|
||||
func (rjs *RedisJobStatsManager) Start() {
|
||||
if rjs.isRunning {
|
||||
if rjs.isRunning.Load().(bool) {
|
||||
return
|
||||
}
|
||||
go rjs.loop()
|
||||
rjs.isRunning = true
|
||||
rjs.isRunning.Store(true)
|
||||
|
||||
logger.Info("Redis job stats manager is started")
|
||||
}
|
||||
@ -86,10 +91,10 @@ func (rjs *RedisJobStatsManager) Start() {
|
||||
//Shutdown is implementation of same method in JobStatsManager interface.
|
||||
func (rjs *RedisJobStatsManager) Shutdown() {
|
||||
defer func() {
|
||||
rjs.isRunning = false
|
||||
rjs.isRunning.Store(false)
|
||||
}()
|
||||
|
||||
if !rjs.isRunning {
|
||||
if !(rjs.isRunning.Load().(bool)) {
|
||||
return
|
||||
}
|
||||
rjs.stopChan <- struct{}{}
|
||||
@ -139,7 +144,7 @@ func (rjs *RedisJobStatsManager) loop() {
|
||||
controlChan := make(chan struct{})
|
||||
|
||||
defer func() {
|
||||
rjs.isRunning = false
|
||||
rjs.isRunning.Store(false)
|
||||
//Notify other sub goroutines
|
||||
close(controlChan)
|
||||
logger.Info("Redis job stats manager is stopped")
|
||||
@ -153,6 +158,8 @@ func (rjs *RedisJobStatsManager) loop() {
|
||||
if err := rjs.process(item); err != nil {
|
||||
item.fails++
|
||||
if item.fails < maxFails {
|
||||
logger.Warningf("Failed to process '%s' request with error: %s\n", item.op, err)
|
||||
|
||||
//Retry after a random interval
|
||||
go func() {
|
||||
timer := time.NewTimer(time.Duration(backoff(item.fails)) * time.Second)
|
||||
|
@ -1,6 +1,269 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
package opm
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
func TestRetrieveJob(t *testing.T) {}
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/job"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/models"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
dialConnectionTimeout = 30 * time.Second
|
||||
healthCheckPeriod = time.Minute
|
||||
dialReadTimeout = healthCheckPeriod + 10*time.Second
|
||||
dialWriteTimeout = 10 * time.Second
|
||||
testingRedisHost = "REDIS_HOST"
|
||||
testingNamespace = "testing_job_service_v2"
|
||||
)
|
||||
|
||||
var redisHost = getRedisHost()
|
||||
var redisPool = &redis.Pool{
|
||||
MaxActive: 2,
|
||||
MaxIdle: 2,
|
||||
Wait: true,
|
||||
Dial: func() (redis.Conn, error) {
|
||||
return redis.Dial(
|
||||
"tcp",
|
||||
fmt.Sprintf("%s:%d", redisHost, 6379),
|
||||
redis.DialConnectTimeout(dialConnectionTimeout),
|
||||
redis.DialReadTimeout(dialReadTimeout),
|
||||
redis.DialWriteTimeout(dialWriteTimeout),
|
||||
)
|
||||
},
|
||||
}
|
||||
|
||||
func TestSetJobStatus(t *testing.T) {
|
||||
mgr := createStatsManager(redisPool)
|
||||
mgr.Start()
|
||||
defer mgr.Shutdown()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
//make sure data existing
|
||||
testingStats := createFakeStats()
|
||||
mgr.Save(testingStats)
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
mgr.SetJobStatus("fake_job_ID", "running")
|
||||
<-time.After(100 * time.Millisecond)
|
||||
stats, err := mgr.Retrieve("fake_job_ID")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if stats.Stats.Status != "running" {
|
||||
t.Errorf("expect job status 'running' but got '%s'\n", stats.Stats.Status)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCommand(t *testing.T) {
|
||||
mgr := createStatsManager(redisPool)
|
||||
mgr.Start()
|
||||
defer mgr.Shutdown()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
if err := mgr.SendCommand("fake_job_ID", CtlCommandStop); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if cmd, err := mgr.CtlCommand("fake_job_ID"); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
if cmd != CtlCommandStop {
|
||||
t.Errorf("expect '%s' but got '%s'", CtlCommandStop, cmd)
|
||||
}
|
||||
}
|
||||
|
||||
key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDieAt(t *testing.T) {
|
||||
mgr := createStatsManager(redisPool)
|
||||
mgr.Start()
|
||||
defer mgr.Shutdown()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
testingStats := createFakeStats()
|
||||
mgr.Save(testingStats)
|
||||
|
||||
dieAt := time.Now().Unix()
|
||||
if err := createDeadJob(redisPool.Get(), dieAt); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
<-time.After(200 * time.Millisecond)
|
||||
mgr.DieAt("fake_job_ID", dieAt)
|
||||
<-time.After(300 * time.Millisecond)
|
||||
|
||||
stats, err := mgr.Retrieve("fake_job_ID")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if stats.Stats.DieAt != dieAt {
|
||||
t.Errorf("expect die at '%d' but got '%d'\n", dieAt, stats.Stats.DieAt)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
key2 := utils.RedisKeyDead(testingNamespace)
|
||||
if err := clear(key2, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterHook(t *testing.T) {
|
||||
mgr := createStatsManager(redisPool)
|
||||
mgr.Start()
|
||||
defer mgr.Shutdown()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
if err := mgr.RegisterHook("fake_job_ID", "http://localhost:9999", false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpireJobStats(t *testing.T) {
|
||||
mgr := createStatsManager(redisPool)
|
||||
mgr.Start()
|
||||
defer mgr.Shutdown()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
//make sure data existing
|
||||
testingStats := createFakeStats()
|
||||
mgr.Save(testingStats)
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
if err := mgr.ExpirePeriodicJobStats("fake_job_ID"); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckIn(t *testing.T) {
|
||||
mgr := createStatsManager(redisPool)
|
||||
mgr.Start()
|
||||
defer mgr.Shutdown()
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
//make sure data existing
|
||||
testingStats := createFakeStats()
|
||||
mgr.Save(testingStats)
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
//Start http server
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintln(w, "ok")
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
if err := mgr.RegisterHook("fake_job_ID", ts.URL, false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
mgr.CheckIn("fake_job_ID", "checkin")
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
stats, err := mgr.Retrieve("fake_job_ID")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if stats.Stats.CheckIn != "checkin" {
|
||||
t.Errorf("expect check in info 'checkin' but got '%s'\n", stats.Stats.CheckIn)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func getRedisHost() string {
|
||||
redisHost := os.Getenv(testingRedisHost)
|
||||
if redisHost == "" {
|
||||
redisHost = "10.160.178.186" //for local test
|
||||
}
|
||||
|
||||
return redisHost
|
||||
}
|
||||
|
||||
func createStatsManager(redisPool *redis.Pool) JobStatsManager {
|
||||
ctx := context.Background()
|
||||
return NewRedisJobStatsManager(ctx, testingNamespace, redisPool)
|
||||
}
|
||||
|
||||
func clear(key string, conn redis.Conn) error {
|
||||
if conn != nil {
|
||||
defer conn.Close()
|
||||
_, err := conn.Do("DEL", key)
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.New("failed to clear")
|
||||
}
|
||||
|
||||
func createFakeStats() models.JobStats {
|
||||
testingStats := models.JobStats{
|
||||
Stats: &models.JobStatData{
|
||||
JobID: "fake_job_ID",
|
||||
JobKind: job.JobKindPeriodic,
|
||||
JobName: "fake_job",
|
||||
Status: "Pending",
|
||||
IsUnique: false,
|
||||
RefLink: "/api/v1/jobs/fake_job_ID",
|
||||
CronSpec: "5 * * * * *",
|
||||
EnqueueTime: time.Now().Unix(),
|
||||
UpdateTime: time.Now().Unix(),
|
||||
},
|
||||
}
|
||||
|
||||
return testingStats
|
||||
}
|
||||
|
||||
func createDeadJob(conn redis.Conn, dieAt int64) error {
|
||||
dead := make(map[string]interface{})
|
||||
dead["name"] = "fake_job"
|
||||
dead["id"] = "fake_job_ID"
|
||||
dead["args"] = make(map[string]interface{})
|
||||
dead["fails"] = 3
|
||||
dead["err"] = "testing error"
|
||||
dead["failed_at"] = dieAt
|
||||
|
||||
rawJSON, err := json.Marshal(&dead)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer conn.Close()
|
||||
key := utils.RedisKeyDead(testingNamespace)
|
||||
_, err = conn.Do("ZADD", key, dieAt, rawJSON)
|
||||
return err
|
||||
}
|
||||
|
68
src/jobservice_v2/period/job_policy_test.go
Normal file
68
src/jobservice_v2/period/job_policy_test.go
Normal file
@ -0,0 +1,68 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
package period
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPeriodicJobPolicy(t *testing.T) {
|
||||
p := createPolicy("")
|
||||
|
||||
data, err := p.Serialize()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if err := p.DeSerialize(data); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeriodicJobPolicyStore(t *testing.T) {
|
||||
ps := &periodicJobPolicyStore{
|
||||
lock: new(sync.RWMutex),
|
||||
policies: make(map[string]*PeriodicJobPolicy),
|
||||
}
|
||||
|
||||
ps.add(createPolicy("fake_ID_Steven"))
|
||||
if ps.size() != 1 {
|
||||
t.Errorf("expect size 1 but got '%d'\n", ps.size())
|
||||
}
|
||||
pl := make([]*PeriodicJobPolicy, 0)
|
||||
pl = append(pl, createPolicy(""))
|
||||
pl = append(pl, createPolicy(""))
|
||||
ps.addAll(pl)
|
||||
if ps.size() != 3 {
|
||||
t.Errorf("expect size 3 but got '%d'\n", ps.size())
|
||||
}
|
||||
|
||||
l := ps.list()
|
||||
if l == nil || len(l) != 3 {
|
||||
t.Error("expect a policy list with 3 items but got invalid list")
|
||||
}
|
||||
|
||||
rp := ps.remove("fake_ID_Steven")
|
||||
if rp == nil {
|
||||
t.Error("expect none nil policy object but got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func createPolicy(id string) *PeriodicJobPolicy {
|
||||
theID := id
|
||||
if theID == "" {
|
||||
theID = fmt.Sprintf("fake_ID_%d", time.Now().UnixNano()+int64(rand.Intn(1000)))
|
||||
}
|
||||
p := &PeriodicJobPolicy{
|
||||
PolicyID: theID,
|
||||
JobName: "fake_job",
|
||||
JobParameters: make(map[string]interface{}),
|
||||
CronSpec: "5 * * * * *",
|
||||
}
|
||||
p.JobParameters["image"] = "testing:v1"
|
||||
|
||||
return p
|
||||
}
|
@ -15,6 +15,7 @@
|
||||
package replicator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/vmware/harbor/src/common/dao"
|
||||
@ -83,8 +84,8 @@ func (d *DefaultReplicator) Replicate(replication *Replication) error {
|
||||
Metadata: &job_models.JobMetadata{
|
||||
JobKind: common_job.JobKindGeneric,
|
||||
},
|
||||
// TODO
|
||||
StatusHook: "",
|
||||
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d",
|
||||
config.InternalUIURL(), id),
|
||||
}
|
||||
|
||||
if operation == common_models.RepOpTransfer {
|
||||
|
@ -21,11 +21,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/common/dao"
|
||||
common_job "github.com/vmware/harbor/src/common/job"
|
||||
"github.com/vmware/harbor/src/common/models"
|
||||
"github.com/vmware/harbor/src/common/utils/log"
|
||||
"github.com/vmware/harbor/src/replication/core"
|
||||
api_models "github.com/vmware/harbor/src/ui/api/models"
|
||||
"github.com/vmware/harbor/src/ui/config"
|
||||
"github.com/vmware/harbor/src/ui/utils"
|
||||
)
|
||||
|
||||
@ -176,10 +176,17 @@ func (ra *RepJobAPI) GetLog() {
|
||||
return
|
||||
}
|
||||
|
||||
url := buildJobLogURL(strconv.FormatInt(ra.jobID, 10), ReplicationJobType)
|
||||
err = utils.RequestAsUI(http.MethodGet, url, nil, utils.NewJobLogRespHandler(&ra.BaseAPI))
|
||||
logBytes, err := utils.GetJobServiceClient().GetJobLog(job.UUID)
|
||||
if err != nil {
|
||||
ra.RenderError(http.StatusInternalServerError, err.Error())
|
||||
ra.HandleInternalServerError(fmt.Sprintf("failed to get log of job %s: %v",
|
||||
job.UUID, err))
|
||||
return
|
||||
}
|
||||
ra.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(len(logBytes)))
|
||||
ra.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Type"), "text/plain")
|
||||
_, err = ra.Ctx.ResponseWriter.Write(logBytes)
|
||||
if err != nil {
|
||||
ra.HandleInternalServerError(fmt.Sprintf("failed to write log of job %s: %v", job.UUID, err))
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -199,10 +206,17 @@ func (ra *RepJobAPI) StopJobs() {
|
||||
ra.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy %d not found", req.PolicyID))
|
||||
}
|
||||
|
||||
if err = config.GlobalJobserviceClient.StopReplicationJobs(req.PolicyID); err != nil {
|
||||
ra.HandleInternalServerError(fmt.Sprintf("failed to stop replication jobs of policy %d: %v", req.PolicyID, err))
|
||||
jobs, err := dao.GetRepJobByPolicy(policy.ID)
|
||||
if err != nil {
|
||||
ra.HandleInternalServerError(fmt.Sprintf("failed to list jobs of policy %d: %v", policy.ID, err))
|
||||
return
|
||||
}
|
||||
for _, job := range jobs {
|
||||
if err = utils.GetJobServiceClient().PostAction(job.UUID, common_job.JobActionStop); err != nil {
|
||||
log.Errorf("failed to stop job id-%d uuid-%s: %v", job.ID, job.UUID, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//TODO:add Post handler to call job service API to submit jobs by policy
|
||||
|
@ -37,33 +37,50 @@ var statusMap = map[string]string{
|
||||
// Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice.
|
||||
type Handler struct {
|
||||
api.BaseController
|
||||
id int64
|
||||
status string
|
||||
}
|
||||
|
||||
// HandleScan handles the webhook of scan job
|
||||
func (h *Handler) HandleScan() {
|
||||
// Prepare ...
|
||||
func (h *Handler) Prepare() {
|
||||
id, err := h.GetInt64FromPath(":id")
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get job ID, error: %v", err)
|
||||
//Avoid job service from resending...
|
||||
h.Abort("200")
|
||||
return
|
||||
}
|
||||
h.id = id
|
||||
var data jobmodels.JobStatusChange
|
||||
err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err)
|
||||
h.Abort("200")
|
||||
return
|
||||
}
|
||||
status, ok := statusMap[data.Status]
|
||||
log.Debugf("Received scan job status update for job: %d, status: %s", id, data.Status)
|
||||
if ok {
|
||||
if err := dao.UpdateScanJobStatus(id, status); err != nil {
|
||||
log.Errorf("Failed to update job status, id: %d, data: %v", id, data)
|
||||
h.HandleInternalServerError(err.Error())
|
||||
}
|
||||
status, ok := statusMap[data.Status]
|
||||
if !ok {
|
||||
h.Abort("200")
|
||||
return
|
||||
}
|
||||
h.status = status
|
||||
}
|
||||
|
||||
// HandleScan handles the webhook of scan job
|
||||
func (h *Handler) HandleScan() {
|
||||
if err := dao.UpdateScanJobStatus(h.id, h.status); err != nil {
|
||||
log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status)
|
||||
h.HandleInternalServerError(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
//HandleReplication handles the webhook of replication job
|
||||
func (h *Handler) HandleReplication() {
|
||||
if err := dao.UpdateRepJobStatus(h.id, h.status); err != nil {
|
||||
log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status)
|
||||
h.HandleInternalServerError(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -34,3 +34,10 @@ services:
|
||||
- /data/:/data/
|
||||
ports:
|
||||
- 8888:8080
|
||||
redis:
|
||||
image: vmware/redis-photon:4.0
|
||||
restart: always
|
||||
volumes:
|
||||
- /data/redis:/data
|
||||
ports:
|
||||
- 6379:6379
|
||||
|
Loading…
Reference in New Issue
Block a user