From be97a91650ec92d24e547980dbae9cb20a631a8f Mon Sep 17 00:00:00 2001 From: Tan Jiang Date: Tue, 27 Mar 2018 21:27:52 +0800 Subject: [PATCH 1/2] Integrate with jobservice webhook --- src/common/job/client.go | 4 +- src/common/job/const.go | 24 ++++++- src/jobservice_v2/job/impl/scan/job.go | 9 ++- src/ui/api/scan_job.go | 15 ++++- src/ui/config/config.go | 19 +++--- src/ui/router.go | 5 +- src/ui/service/notifications/jobs/handler.go | 67 ++++++++++++++++++++ src/ui/utils/job.go | 5 +- 8 files changed, 126 insertions(+), 22 deletions(-) create mode 100644 src/ui/service/notifications/jobs/handler.go diff --git a/src/common/job/client.go b/src/common/job/client.go index e74ef344d..078e6ddc3 100644 --- a/src/common/job/client.go +++ b/src/common/job/client.go @@ -16,7 +16,7 @@ import ( type Client interface { SubmitJob(*models.JobData) (string, error) GetJobLog(uuid string) ([]byte, error) - //TODO actions or stop? + //TODO actions or stop? Redirect joblog when we see there's memory issue. } // DefaultClient is the default implementation of Client interface @@ -101,5 +101,3 @@ func (d *DefaultClient) GetJobLog(uuid string) ([]byte, error) { } return data, nil } - -//TODO: builder, default client, etc. diff --git a/src/common/job/const.go b/src/common/job/const.go index d8a867f60..900efe26a 100644 --- a/src/common/job/const.go +++ b/src/common/job/const.go @@ -3,12 +3,32 @@ package job const ( //ImageScanJob is name of scan job it will be used as key to register to job service. ImageScanJob = "IMAGE_SCAN" - // GenericKind marks the job as a generic job, it will be contained in job metadata and passed to job service. - GenericKind = "Generic" // ImageTransfer : the name of image transfer job in job service ImageTransfer = "IMAGE_TRANSFER" // ImageDelete : the name of image delete job in job service ImageDelete = "IMAGE_DELETE" // ImageReplicate : the name of image replicate job in job service ImageReplicate = "IMAGE_REPLICATE" + + //JobKindGeneric : Kind of generic job + JobKindGeneric = "Generic" + //JobKindScheduled : Kind of scheduled job + JobKindScheduled = "Scheduled" + //JobKindPeriodic : Kind of periodic job + JobKindPeriodic = "Periodic" + + //JobServiceStatusPending : job status pending + JobServiceStatusPending = "Pending" + //JobServiceStatusRunning : job status running + JobServiceStatusRunning = "Running" + //JobServiceStatusStopped : job status stopped + JobServiceStatusStopped = "Stopped" + //JobServiceStatusCancelled : job status cancelled + JobServiceStatusCancelled = "Cancelled" + //JobServiceStatusError : job status error + JobServiceStatusError = "Error" + //JobServiceStatusSuccess : job status success + JobServiceStatusSuccess = "Success" + //JobServiceStatusScheduled : job status scheduled + JobServiceStatusScheduled = "Scheduled" ) diff --git a/src/jobservice_v2/job/impl/scan/job.go b/src/jobservice_v2/job/impl/scan/job.go index b002bf15f..952a20131 100644 --- a/src/jobservice_v2/job/impl/scan/job.go +++ b/src/jobservice_v2/job/impl/scan/job.go @@ -51,8 +51,7 @@ func (cj *ClairJob) Validate(params map[string]interface{}) error { // Run implements the interface in job/Interface func (cj *ClairJob) Run(ctx env.JobContext, params map[string]interface{}) error { - // TODO: get logger from ctx? - logger := log.DefaultLogger() + logger := ctx.GetLogger() jobParms, err := transformParam(params) if err != nil { @@ -79,7 +78,11 @@ func (cj *ClairJob) Run(ctx env.JobContext, params map[string]interface{}) error logger.Errorf("Failed to prepare layers, error: %v", err) return err } - clairClient := clair.NewClient(jobParms.ClairEndpoint, logger) + loggerImpl, ok := logger.(*log.Logger) + if !ok { + loggerImpl = log.DefaultLogger() + } + clairClient := clair.NewClient(jobParms.ClairEndpoint, loggerImpl) for _, l := range layers { logger.Infof("Scanning Layer: %s, path: %s", l.Name, l.Path) diff --git a/src/ui/api/scan_job.go b/src/ui/api/scan_job.go index c600ab3d5..0e264db61 100644 --- a/src/ui/api/scan_job.go +++ b/src/ui/api/scan_job.go @@ -19,6 +19,7 @@ import ( "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/ui/utils" + "fmt" "net/http" "strconv" "strings" @@ -29,6 +30,7 @@ type ScanJobAPI struct { BaseController jobID int64 projectName string + jobUUID string } // Prepare validates that whether user has read permission to the project of the repo the scan job scanned. @@ -55,14 +57,21 @@ func (sj *ScanJobAPI) Prepare() { sj.HandleForbidden(sj.SecurityCtx.GetUsername()) } sj.projectName = projectName + sj.jobUUID = data.UUID } //GetLog ... func (sj *ScanJobAPI) GetLog() { - url := buildJobLogURL(strconv.FormatInt(sj.jobID, 10), ScanJobType) - err := utils.RequestAsUI(http.MethodGet, url, nil, utils.NewJobLogRespHandler(&sj.BaseAPI)) + logBytes, err := utils.GetJobServiceClient().GetJobLog(sj.jobUUID) if err != nil { - sj.RenderError(http.StatusInternalServerError, err.Error()) + sj.HandleInternalServerError(fmt.Sprintf("Failed to get job logs, uuid: %s, error: %v", sj.jobUUID, err)) return } + sj.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(len(logBytes))) + sj.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Type"), "text/plain") + _, err = sj.Ctx.ResponseWriter.Write(logBytes) + if err != nil { + sj.HandleInternalServerError(fmt.Sprintf("Failed to write job logs, uuid: %s, error: %v", sj.jobUUID, err)) + } + } diff --git a/src/ui/config/config.go b/src/ui/config/config.go index 7a0ceb625..a16887f58 100644 --- a/src/ui/config/config.go +++ b/src/ui/config/config.go @@ -304,17 +304,20 @@ func InternalJobServiceURL() string { return strings.TrimSuffix(cfg[common.JobServiceURL].(string), "/") } -// InternalTokenServiceEndpoint returns token service endpoint for internal communication between Harbor containers -func InternalTokenServiceEndpoint() string { - uiURL := common.DefaultUIEndpoint +// InternalUIURL returns the local ui url +func InternalUIURL() string { cfg, err := mg.Get() if err != nil { - log.Warningf("Failed to Get job service UI URL from backend, error: %v, will use default value.") - - } else { - uiURL = cfg[common.UIURL].(string) + log.Warningf("Failed to Get job service UI URL from backend, error: %v, will return default value.") + return common.DefaultUIEndpoint } - return strings.TrimSuffix(uiURL, "/") + "/service/token" + return strings.TrimSuffix(cfg[common.UIURL].(string), "/") + +} + +// InternalTokenServiceEndpoint returns token service endpoint for internal communication between Harbor containers +func InternalTokenServiceEndpoint() string { + return InternalUIURL() + "/service/token" } // InternalNotaryEndpoint returns notary server endpoint for internal communication between Harbor containers diff --git a/src/ui/router.go b/src/ui/router.go index 59a0b6288..64277f10a 100644 --- a/src/ui/router.go +++ b/src/ui/router.go @@ -19,6 +19,7 @@ import ( "github.com/vmware/harbor/src/ui/config" "github.com/vmware/harbor/src/ui/controllers" "github.com/vmware/harbor/src/ui/service/notifications/clair" + "github.com/vmware/harbor/src/ui/service/notifications/jobs" "github.com/vmware/harbor/src/ui/service/notifications/registry" "github.com/vmware/harbor/src/ui/service/token" @@ -112,10 +113,12 @@ func initRouters() { //external service that hosted on harbor process: beego.Router("/service/notifications", ®istry.NotificationHandler{}) beego.Router("/service/notifications/clair", &clair.Handler{}, "post:Handle") + beego.Router("/service/notifications/jobs/scan/:id([0-9]+)", &jobs.Handler{}, "post:HandleScan") + beego.Router("/service/notifications/jobs/replication/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplication") beego.Router("/service/token", &token.Handler{}) beego.Router("/registryproxy/*", &controllers.RegistryProxy{}, "*:Handle") - + //Error pages beego.ErrorController(&controllers.ErrorController{}) diff --git a/src/ui/service/notifications/jobs/handler.go b/src/ui/service/notifications/jobs/handler.go new file mode 100644 index 000000000..40a3fa204 --- /dev/null +++ b/src/ui/service/notifications/jobs/handler.go @@ -0,0 +1,67 @@ +// Copyright (c) 2017 VMware, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobs + +import ( + "encoding/json" + + "github.com/vmware/harbor/src/common/dao" + "github.com/vmware/harbor/src/common/job" + jobmodels "github.com/vmware/harbor/src/common/job/models" + "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/utils/log" + "github.com/vmware/harbor/src/ui/api" +) + +var statusMap = map[string]string{ + job.JobServiceStatusPending: models.JobPending, + job.JobServiceStatusRunning: models.JobRunning, + job.JobServiceStatusStopped: models.JobStopped, + job.JobServiceStatusCancelled: models.JobCanceled, + job.JobServiceStatusError: models.JobError, + job.JobServiceStatusSuccess: models.JobFinished, +} + +// Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice. +type Handler struct { + api.BaseController +} + +func (h *Handler) HandleScan() { + id, err := h.GetInt64FromPath(":id") + if err != nil { + log.Errorf("Failed to get job ID, error: %v", err) + //Avoid job service from resending... + return + } + var data jobmodels.JobStatusChange + err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data) + if err != nil { + log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err) + return + } + status, ok := statusMap[data.Status] + log.Debugf("Received scan job status update for job: %d, status: %s", id, data.Status) + if ok { + if err := dao.UpdateScanJobStatus(id, status); err != nil { + log.Errorf("Failed to update job status, id: %s, data: %v", id, data) + h.HandleInternalServerError(err.Error()) + } + } + +} + +func (h *Handler) HandleReplication() { +} diff --git a/src/ui/utils/job.go b/src/ui/utils/job.go index 87382a2f5..7cc7f4600 100644 --- a/src/ui/utils/job.go +++ b/src/ui/utils/job.go @@ -25,6 +25,7 @@ import ( "github.com/vmware/harbor/src/ui/config" "encoding/json" + "fmt" "sync" ) @@ -164,7 +165,7 @@ func buildScanJobData(jobID int64, repository, tag, digest string) (*jobmodels.J return nil, err } meta := jobmodels.JobMetadata{ - JobKind: job.GenericKind, + JobKind: job.JobKindGeneric, IsUnique: false, } @@ -172,7 +173,7 @@ func buildScanJobData(jobID int64, repository, tag, digest string) (*jobmodels.J Name: job.ImageScanJob, Parameters: jobmodels.Parameters(parmsMap), Metadata: &meta, - StatusHook: "", + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/scan/%d", config.InternalUIURL(), jobID), } return data, nil From 0abf53c1123a2285bce2e8a1edc0ac5e4db1c9c1 Mon Sep 17 00:00:00 2001 From: Tan Jiang Date: Tue, 27 Mar 2018 21:30:37 +0800 Subject: [PATCH 2/2] fix golint --- src/ui/service/notifications/jobs/handler.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ui/service/notifications/jobs/handler.go b/src/ui/service/notifications/jobs/handler.go index 40a3fa204..ada6126af 100644 --- a/src/ui/service/notifications/jobs/handler.go +++ b/src/ui/service/notifications/jobs/handler.go @@ -39,6 +39,7 @@ type Handler struct { api.BaseController } +// HandleScan handles the webhook of scan job func (h *Handler) HandleScan() { id, err := h.GetInt64FromPath(":id") if err != nil { @@ -56,12 +57,13 @@ func (h *Handler) HandleScan() { log.Debugf("Received scan job status update for job: %d, status: %s", id, data.Status) if ok { if err := dao.UpdateScanJobStatus(id, status); err != nil { - log.Errorf("Failed to update job status, id: %s, data: %v", id, data) + log.Errorf("Failed to update job status, id: %d, data: %v", id, data) h.HandleInternalServerError(err.Error()) } } } +//HandleReplication handles the webhook of replication job func (h *Handler) HandleReplication() { }