Handle replication job status hook

This commit is contained in:
Wenkai Yin 2018-03-24 20:22:24 +08:00
parent d28bad4806
commit dd156ca243
5 changed files with 64 additions and 17 deletions

View File

@ -16,7 +16,8 @@ import (
type Client interface {
SubmitJob(*models.JobData) (string, error)
GetJobLog(uuid string) ([]byte, error)
//TODO actions or stop? Redirect joblog when we see there's memory issue.
PostAction(uuid, action string) error
//TODO Redirect joblog when we see there's memory issue.
}
// DefaultClient is the default implementation of Client interface
@ -101,3 +102,14 @@ func (d *DefaultClient) GetJobLog(uuid string) ([]byte, error) {
}
return data, nil
}
// PostAction call jobservice's API to operate action for job specified by uuid
func (d *DefaultClient) PostAction(uuid, action string) error {
url := d.endpoint + "/api/v1/jobs/" + uuid
req := struct {
Action string `json:"action"`
}{
Action: action,
}
return d.client.Post(url, req)
}

View File

@ -31,4 +31,7 @@ const (
JobServiceStatusSuccess = "Success"
//JobServiceStatusScheduled : job status scheduled
JobServiceStatusScheduled = "Scheduled"
// JobActionStop : the action to stop the job
JobActionStop = "stop"
)

View File

@ -15,6 +15,7 @@
package replicator
import (
"fmt"
"strings"
"github.com/vmware/harbor/src/common/dao"
@ -83,8 +84,8 @@ func (d *DefaultReplicator) Replicate(replication *Replication) error {
Metadata: &job_models.JobMetadata{
JobKind: common_job.JobKindGeneric,
},
// TODO
StatusHook: "",
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d",
config.InternalUIURL(), id),
}
if operation == common_models.RepOpTransfer {

View File

@ -21,11 +21,11 @@ import (
"time"
"github.com/vmware/harbor/src/common/dao"
common_job "github.com/vmware/harbor/src/common/job"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication/core"
api_models "github.com/vmware/harbor/src/ui/api/models"
"github.com/vmware/harbor/src/ui/config"
"github.com/vmware/harbor/src/ui/utils"
)
@ -176,10 +176,17 @@ func (ra *RepJobAPI) GetLog() {
return
}
url := buildJobLogURL(strconv.FormatInt(ra.jobID, 10), ReplicationJobType)
err = utils.RequestAsUI(http.MethodGet, url, nil, utils.NewJobLogRespHandler(&ra.BaseAPI))
logBytes, err := utils.GetJobServiceClient().GetJobLog(job.UUID)
if err != nil {
ra.RenderError(http.StatusInternalServerError, err.Error())
ra.HandleInternalServerError(fmt.Sprintf("failed to get log of job %s: %v",
job.UUID, err))
return
}
ra.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(len(logBytes)))
ra.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Type"), "text/plain")
_, err = ra.Ctx.ResponseWriter.Write(logBytes)
if err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to write log of job %s: %v", job.UUID, err))
return
}
}
@ -199,10 +206,17 @@ func (ra *RepJobAPI) StopJobs() {
ra.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy %d not found", req.PolicyID))
}
if err = config.GlobalJobserviceClient.StopReplicationJobs(req.PolicyID); err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to stop replication jobs of policy %d: %v", req.PolicyID, err))
jobs, err := dao.GetRepJobByPolicy(policy.ID)
if err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to list jobs of policy %d: %v", policy.ID, err))
return
}
for _, job := range jobs {
if err = utils.GetJobServiceClient().PostAction(job.UUID, common_job.JobActionStop); err != nil {
log.Errorf("failed to stop job id-%d uuid-%s: %v", job.ID, job.UUID, err)
continue
}
}
}
//TODO:add Post handler to call job service API to submit jobs by policy

View File

@ -37,33 +37,50 @@ var statusMap = map[string]string{
// Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice.
type Handler struct {
api.BaseController
id int64
status string
}
// HandleScan handles the webhook of scan job
func (h *Handler) HandleScan() {
// Prepare ...
func (h *Handler) Prepare() {
id, err := h.GetInt64FromPath(":id")
if err != nil {
log.Errorf("Failed to get job ID, error: %v", err)
//Avoid job service from resending...
h.Abort("200")
return
}
h.id = id
var data jobmodels.JobStatusChange
err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data)
if err != nil {
log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err)
h.Abort("200")
return
}
status, ok := statusMap[data.Status]
log.Debugf("Received scan job status update for job: %d, status: %s", id, data.Status)
if ok {
if err := dao.UpdateScanJobStatus(id, status); err != nil {
log.Errorf("Failed to update job status, id: %d, data: %v", id, data)
h.HandleInternalServerError(err.Error())
}
status, ok := statusMap[data.Status]
if !ok {
h.Abort("200")
return
}
h.status = status
}
// HandleScan handles the webhook of scan job
func (h *Handler) HandleScan() {
if err := dao.UpdateScanJobStatus(h.id, h.status); err != nil {
log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status)
h.HandleInternalServerError(err.Error())
return
}
}
//HandleReplication handles the webhook of replication job
func (h *Handler) HandleReplication() {
if err := dao.UpdateRepJobStatus(h.id, h.status); err != nil {
log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status)
h.HandleInternalServerError(err.Error())
return
}
}