Merge branch 'job_service' of https://github.com/vmware/harbor into job_service

This commit is contained in:
Steven Zou 2018-03-28 11:26:36 +08:00
commit 4541d4b53f
8 changed files with 128 additions and 22 deletions

View File

@ -16,7 +16,7 @@ import (
type Client interface {
SubmitJob(*models.JobData) (string, error)
GetJobLog(uuid string) ([]byte, error)
//TODO actions or stop?
//TODO actions or stop? Redirect joblog when we see there's memory issue.
}
// DefaultClient is the default implementation of Client interface
@ -101,5 +101,3 @@ func (d *DefaultClient) GetJobLog(uuid string) ([]byte, error) {
}
return data, nil
}
//TODO: builder, default client, etc.

View File

@ -3,12 +3,32 @@ package job
const (
//ImageScanJob is name of scan job it will be used as key to register to job service.
ImageScanJob = "IMAGE_SCAN"
// GenericKind marks the job as a generic job, it will be contained in job metadata and passed to job service.
GenericKind = "Generic"
// ImageTransfer : the name of image transfer job in job service
ImageTransfer = "IMAGE_TRANSFER"
// ImageDelete : the name of image delete job in job service
ImageDelete = "IMAGE_DELETE"
// ImageReplicate : the name of image replicate job in job service
ImageReplicate = "IMAGE_REPLICATE"
//JobKindGeneric : Kind of generic job
JobKindGeneric = "Generic"
//JobKindScheduled : Kind of scheduled job
JobKindScheduled = "Scheduled"
//JobKindPeriodic : Kind of periodic job
JobKindPeriodic = "Periodic"
//JobServiceStatusPending : job status pending
JobServiceStatusPending = "Pending"
//JobServiceStatusRunning : job status running
JobServiceStatusRunning = "Running"
//JobServiceStatusStopped : job status stopped
JobServiceStatusStopped = "Stopped"
//JobServiceStatusCancelled : job status cancelled
JobServiceStatusCancelled = "Cancelled"
//JobServiceStatusError : job status error
JobServiceStatusError = "Error"
//JobServiceStatusSuccess : job status success
JobServiceStatusSuccess = "Success"
//JobServiceStatusScheduled : job status scheduled
JobServiceStatusScheduled = "Scheduled"
)

View File

@ -51,8 +51,7 @@ func (cj *ClairJob) Validate(params map[string]interface{}) error {
// Run implements the interface in job/Interface
func (cj *ClairJob) Run(ctx env.JobContext, params map[string]interface{}) error {
// TODO: get logger from ctx?
logger := log.DefaultLogger()
logger := ctx.GetLogger()
jobParms, err := transformParam(params)
if err != nil {
@ -79,7 +78,11 @@ func (cj *ClairJob) Run(ctx env.JobContext, params map[string]interface{}) error
logger.Errorf("Failed to prepare layers, error: %v", err)
return err
}
clairClient := clair.NewClient(jobParms.ClairEndpoint, logger)
loggerImpl, ok := logger.(*log.Logger)
if !ok {
loggerImpl = log.DefaultLogger()
}
clairClient := clair.NewClient(jobParms.ClairEndpoint, loggerImpl)
for _, l := range layers {
logger.Infof("Scanning Layer: %s, path: %s", l.Name, l.Path)

View File

@ -19,6 +19,7 @@ import (
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/ui/utils"
"fmt"
"net/http"
"strconv"
"strings"
@ -29,6 +30,7 @@ type ScanJobAPI struct {
BaseController
jobID int64
projectName string
jobUUID string
}
// Prepare validates that whether user has read permission to the project of the repo the scan job scanned.
@ -55,14 +57,21 @@ func (sj *ScanJobAPI) Prepare() {
sj.HandleForbidden(sj.SecurityCtx.GetUsername())
}
sj.projectName = projectName
sj.jobUUID = data.UUID
}
//GetLog ...
func (sj *ScanJobAPI) GetLog() {
url := buildJobLogURL(strconv.FormatInt(sj.jobID, 10), ScanJobType)
err := utils.RequestAsUI(http.MethodGet, url, nil, utils.NewJobLogRespHandler(&sj.BaseAPI))
logBytes, err := utils.GetJobServiceClient().GetJobLog(sj.jobUUID)
if err != nil {
sj.RenderError(http.StatusInternalServerError, err.Error())
sj.HandleInternalServerError(fmt.Sprintf("Failed to get job logs, uuid: %s, error: %v", sj.jobUUID, err))
return
}
sj.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(len(logBytes)))
sj.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Type"), "text/plain")
_, err = sj.Ctx.ResponseWriter.Write(logBytes)
if err != nil {
sj.HandleInternalServerError(fmt.Sprintf("Failed to write job logs, uuid: %s, error: %v", sj.jobUUID, err))
}
}

View File

@ -304,17 +304,20 @@ func InternalJobServiceURL() string {
return strings.TrimSuffix(cfg[common.JobServiceURL].(string), "/")
}
// InternalTokenServiceEndpoint returns token service endpoint for internal communication between Harbor containers
func InternalTokenServiceEndpoint() string {
uiURL := common.DefaultUIEndpoint
// InternalUIURL returns the local ui url
func InternalUIURL() string {
cfg, err := mg.Get()
if err != nil {
log.Warningf("Failed to Get job service UI URL from backend, error: %v, will use default value.")
} else {
uiURL = cfg[common.UIURL].(string)
log.Warningf("Failed to Get job service UI URL from backend, error: %v, will return default value.")
return common.DefaultUIEndpoint
}
return strings.TrimSuffix(uiURL, "/") + "/service/token"
return strings.TrimSuffix(cfg[common.UIURL].(string), "/")
}
// InternalTokenServiceEndpoint returns token service endpoint for internal communication between Harbor containers
func InternalTokenServiceEndpoint() string {
return InternalUIURL() + "/service/token"
}
// InternalNotaryEndpoint returns notary server endpoint for internal communication between Harbor containers

View File

@ -19,6 +19,7 @@ import (
"github.com/vmware/harbor/src/ui/config"
"github.com/vmware/harbor/src/ui/controllers"
"github.com/vmware/harbor/src/ui/service/notifications/clair"
"github.com/vmware/harbor/src/ui/service/notifications/jobs"
"github.com/vmware/harbor/src/ui/service/notifications/registry"
"github.com/vmware/harbor/src/ui/service/token"
@ -112,10 +113,12 @@ func initRouters() {
//external service that hosted on harbor process:
beego.Router("/service/notifications", &registry.NotificationHandler{})
beego.Router("/service/notifications/clair", &clair.Handler{}, "post:Handle")
beego.Router("/service/notifications/jobs/scan/:id([0-9]+)", &jobs.Handler{}, "post:HandleScan")
beego.Router("/service/notifications/jobs/replication/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplication")
beego.Router("/service/token", &token.Handler{})
beego.Router("/registryproxy/*", &controllers.RegistryProxy{}, "*:Handle")
//Error pages
beego.ErrorController(&controllers.ErrorController{})

View File

@ -0,0 +1,69 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jobs
import (
"encoding/json"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/job"
jobmodels "github.com/vmware/harbor/src/common/job/models"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/ui/api"
)
var statusMap = map[string]string{
job.JobServiceStatusPending: models.JobPending,
job.JobServiceStatusRunning: models.JobRunning,
job.JobServiceStatusStopped: models.JobStopped,
job.JobServiceStatusCancelled: models.JobCanceled,
job.JobServiceStatusError: models.JobError,
job.JobServiceStatusSuccess: models.JobFinished,
}
// Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice.
type Handler struct {
api.BaseController
}
// HandleScan handles the webhook of scan job
func (h *Handler) HandleScan() {
id, err := h.GetInt64FromPath(":id")
if err != nil {
log.Errorf("Failed to get job ID, error: %v", err)
//Avoid job service from resending...
return
}
var data jobmodels.JobStatusChange
err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data)
if err != nil {
log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err)
return
}
status, ok := statusMap[data.Status]
log.Debugf("Received scan job status update for job: %d, status: %s", id, data.Status)
if ok {
if err := dao.UpdateScanJobStatus(id, status); err != nil {
log.Errorf("Failed to update job status, id: %d, data: %v", id, data)
h.HandleInternalServerError(err.Error())
}
}
}
//HandleReplication handles the webhook of replication job
func (h *Handler) HandleReplication() {
}

View File

@ -25,6 +25,7 @@ import (
"github.com/vmware/harbor/src/ui/config"
"encoding/json"
"fmt"
"sync"
)
@ -164,7 +165,7 @@ func buildScanJobData(jobID int64, repository, tag, digest string) (*jobmodels.J
return nil, err
}
meta := jobmodels.JobMetadata{
JobKind: job.GenericKind,
JobKind: job.JobKindGeneric,
IsUnique: false,
}
@ -172,7 +173,7 @@ func buildScanJobData(jobID int64, repository, tag, digest string) (*jobmodels.J
Name: job.ImageScanJob,
Parameters: jobmodels.Parameters(parmsMap),
Metadata: &meta,
StatusHook: "",
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/scan/%d", config.InternalUIURL(), jobID),
}
return data, nil