Merge pull request #4512 from ywk253100/180328_replication_hook

Handle replication job status hook
This commit is contained in:
Wenkai Yin 2018-03-28 16:10:51 +08:00 committed by GitHub
commit 22e06c46e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 64 additions and 17 deletions

View File

@ -16,7 +16,8 @@ import (
type Client interface { type Client interface {
SubmitJob(*models.JobData) (string, error) SubmitJob(*models.JobData) (string, error)
GetJobLog(uuid string) ([]byte, error) GetJobLog(uuid string) ([]byte, error)
//TODO actions or stop? Redirect joblog when we see there's memory issue. PostAction(uuid, action string) error
//TODO Redirect joblog when we see there's memory issue.
} }
// DefaultClient is the default implementation of Client interface // DefaultClient is the default implementation of Client interface
@ -101,3 +102,14 @@ func (d *DefaultClient) GetJobLog(uuid string) ([]byte, error) {
} }
return data, nil return data, nil
} }
// PostAction call jobservice's API to operate action for job specified by uuid
func (d *DefaultClient) PostAction(uuid, action string) error {
url := d.endpoint + "/api/v1/jobs/" + uuid
req := struct {
Action string `json:"action"`
}{
Action: action,
}
return d.client.Post(url, req)
}

View File

@ -31,4 +31,7 @@ const (
JobServiceStatusSuccess = "Success" JobServiceStatusSuccess = "Success"
//JobServiceStatusScheduled : job status scheduled //JobServiceStatusScheduled : job status scheduled
JobServiceStatusScheduled = "Scheduled" JobServiceStatusScheduled = "Scheduled"
// JobActionStop : the action to stop the job
JobActionStop = "stop"
) )

View File

@ -15,6 +15,7 @@
package replicator package replicator
import ( import (
"fmt"
"strings" "strings"
"github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/dao"
@ -83,8 +84,8 @@ func (d *DefaultReplicator) Replicate(replication *Replication) error {
Metadata: &job_models.JobMetadata{ Metadata: &job_models.JobMetadata{
JobKind: common_job.JobKindGeneric, JobKind: common_job.JobKindGeneric,
}, },
// TODO StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d",
StatusHook: "", config.InternalUIURL(), id),
} }
if operation == common_models.RepOpTransfer { if operation == common_models.RepOpTransfer {

View File

@ -21,11 +21,11 @@ import (
"time" "time"
"github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/dao"
common_job "github.com/vmware/harbor/src/common/job"
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication/core" "github.com/vmware/harbor/src/replication/core"
api_models "github.com/vmware/harbor/src/ui/api/models" api_models "github.com/vmware/harbor/src/ui/api/models"
"github.com/vmware/harbor/src/ui/config"
"github.com/vmware/harbor/src/ui/utils" "github.com/vmware/harbor/src/ui/utils"
) )
@ -176,10 +176,17 @@ func (ra *RepJobAPI) GetLog() {
return return
} }
url := buildJobLogURL(strconv.FormatInt(ra.jobID, 10), ReplicationJobType) logBytes, err := utils.GetJobServiceClient().GetJobLog(job.UUID)
err = utils.RequestAsUI(http.MethodGet, url, nil, utils.NewJobLogRespHandler(&ra.BaseAPI))
if err != nil { if err != nil {
ra.RenderError(http.StatusInternalServerError, err.Error()) ra.HandleInternalServerError(fmt.Sprintf("failed to get log of job %s: %v",
job.UUID, err))
return
}
ra.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(len(logBytes)))
ra.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Type"), "text/plain")
_, err = ra.Ctx.ResponseWriter.Write(logBytes)
if err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to write log of job %s: %v", job.UUID, err))
return return
} }
} }
@ -199,10 +206,17 @@ func (ra *RepJobAPI) StopJobs() {
ra.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy %d not found", req.PolicyID)) ra.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy %d not found", req.PolicyID))
} }
if err = config.GlobalJobserviceClient.StopReplicationJobs(req.PolicyID); err != nil { jobs, err := dao.GetRepJobByPolicy(policy.ID)
ra.HandleInternalServerError(fmt.Sprintf("failed to stop replication jobs of policy %d: %v", req.PolicyID, err)) if err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to list jobs of policy %d: %v", policy.ID, err))
return return
} }
for _, job := range jobs {
if err = utils.GetJobServiceClient().PostAction(job.UUID, common_job.JobActionStop); err != nil {
log.Errorf("failed to stop job id-%d uuid-%s: %v", job.ID, job.UUID, err)
continue
}
}
} }
//TODO:add Post handler to call job service API to submit jobs by policy //TODO:add Post handler to call job service API to submit jobs by policy

View File

@ -37,33 +37,50 @@ var statusMap = map[string]string{
// Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice. // Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice.
type Handler struct { type Handler struct {
api.BaseController api.BaseController
id int64
status string
} }
// HandleScan handles the webhook of scan job // Prepare ...
func (h *Handler) HandleScan() { func (h *Handler) Prepare() {
id, err := h.GetInt64FromPath(":id") id, err := h.GetInt64FromPath(":id")
if err != nil { if err != nil {
log.Errorf("Failed to get job ID, error: %v", err) log.Errorf("Failed to get job ID, error: %v", err)
//Avoid job service from resending... //Avoid job service from resending...
h.Abort("200")
return return
} }
h.id = id
var data jobmodels.JobStatusChange var data jobmodels.JobStatusChange
err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data) err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data)
if err != nil { if err != nil {
log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err) log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err)
h.Abort("200")
return return
} }
status, ok := statusMap[data.Status]
log.Debugf("Received scan job status update for job: %d, status: %s", id, data.Status) log.Debugf("Received scan job status update for job: %d, status: %s", id, data.Status)
if ok { status, ok := statusMap[data.Status]
if err := dao.UpdateScanJobStatus(id, status); err != nil { if !ok {
log.Errorf("Failed to update job status, id: %d, data: %v", id, data) h.Abort("200")
h.HandleInternalServerError(err.Error()) return
}
} }
h.status = status
}
// HandleScan handles the webhook of scan job
func (h *Handler) HandleScan() {
if err := dao.UpdateScanJobStatus(h.id, h.status); err != nil {
log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status)
h.HandleInternalServerError(err.Error())
return
}
} }
//HandleReplication handles the webhook of replication job //HandleReplication handles the webhook of replication job
func (h *Handler) HandleReplication() { func (h *Handler) HandleReplication() {
if err := dao.UpdateRepJobStatus(h.id, h.status); err != nil {
log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status)
h.HandleInternalServerError(err.Error())
return
}
} }