Trigger scan job from UI.

This commit is contained in:
Tan Jiang 2018-03-26 18:07:21 +08:00
parent 745b21abbc
commit 41ce0891ab
14 changed files with 434 additions and 75 deletions

View File

@ -22,4 +22,9 @@ worker_pool:
port: 6379
namespace: "namespace"
#Logger for job
logger:
path: "/var/log/jobs"
level: "INFO"
archive_period: 14 #days
#Admin server endpoint
admin_server: "http://adminserver:8080/"

View File

@ -1086,6 +1086,16 @@ func TestAddRepJob(t *testing.T) {
}
}
func TestSetRepJobUUID(t *testing.T) {
uuid := "u-rep-job-uuid"
assert := assert.New(t)
err := SetRepJobUUID(jobID, uuid)
assert.Nil(err)
j, err := GetRepJob(jobID)
assert.Nil(err)
assert.Equal(uuid, j.UUID)
}
func TestUpdateRepJobStatus(t *testing.T) {
err := UpdateRepJobStatus(jobID, models.JobFinished)
if err != nil {
@ -1505,6 +1515,21 @@ func TestGetScanJobs(t *testing.T) {
assert.Nil(err)
}
func TestSetScanJobUUID(t *testing.T) {
uuid := "u-scan-job-uuid"
assert := assert.New(t)
id, err := AddScanJob(sj1)
assert.Nil(err)
err = SetScanJobUUID(id, uuid)
assert.Nil(err)
j, err := GetScanJob(id)
assert.Nil(err)
assert.Equal(uuid, j.UUID)
err = ClearTable(models.ScanJobTable)
assert.Nil(err)
}
func TestUpdateScanJobStatus(t *testing.T) {
assert := assert.New(t)
id, err := AddScanJob(sj1)

View File

@ -391,6 +391,20 @@ func UpdateRepJobStatus(id int64, status string) error {
return err
}
// SetRepJobUUID ...
func SetRepJobUUID(id int64, uuid string) error {
o := GetOrmer()
j := models.RepJob{
ID: id,
UUID: uuid,
}
n, err := o.Update(&j, "UUID")
if n == 0 {
log.Warningf("no records are updated when updating replication job %d", id)
}
return err
}
// ResetRunningJobs update all running jobs status to pending, including replication jobs and scan jobs.
func ResetRunningJobs() error {
o := GetOrmer()

View File

@ -84,6 +84,21 @@ func UpdateScanJobStatus(id int64, status string) error {
return err
}
// SetScanJobDigest set UUID to the record so it associates with the job in job service.
func SetScanJobUUID(id int64, uuid string) error {
o := GetOrmer()
sj := models.ScanJob{
ID: id,
UUID: uuid,
}
n, err := o.Update(&sj, "UUID")
if n == 0 {
log.Warningf("no records are updated when updating scan job %d", id)
}
return err
}
func scanJobQs(limit ...int) orm.QuerySeter {
o := GetOrmer()
l := -1

105
src/common/job/client.go Normal file
View File

@ -0,0 +1,105 @@
package job
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"strings"
commonhttp "github.com/vmware/harbor/src/common/http"
"github.com/vmware/harbor/src/common/http/modifier/auth"
"github.com/vmware/harbor/src/common/job/models"
)
// Client wraps interface to access jobservice.
type Client interface {
SubmitJob(*models.JobData) (string, error)
GetJobLog(uuid string) ([]byte, error)
//TODO actions or stop?
}
// DefaultClient is the default implementation of Client interface
type DefaultClient struct {
endpoint string
client *commonhttp.Client
}
// NewDefaultClient creates a default client based on endpoint and secret.
func NewDefaultClient(endpoint, secret string) *DefaultClient {
var c *commonhttp.Client
if len(secret) > 0 {
c = commonhttp.NewClient(nil, auth.NewSecretAuthorizer(secret))
} else {
c = commonhttp.NewClient(nil)
}
e := strings.TrimRight(endpoint, "/")
return &DefaultClient{
endpoint: e,
client: c,
}
}
//SubmitJob call jobserivce API to submit a job and returns the job's UUID.
func (d *DefaultClient) SubmitJob(jd *models.JobData) (string, error) {
url := d.endpoint + "/api/v1/jobs"
jq := models.JobRequest{
Job: jd,
}
b, err := json.Marshal(jq)
if err != nil {
return "", err
}
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := d.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusAccepted {
return "", &commonhttp.Error{
Code: resp.StatusCode,
Message: string(data),
}
}
stats := &models.JobStats{}
if err := json.Unmarshal(data, stats); err != nil {
return "", err
}
return stats.Stats.JobID, nil
}
//GetJobLog call jobserivce API to get the log of a job. It only accepts the UUID of the job
func (d *DefaultClient) GetJobLog(uuid string) ([]byte, error) {
url := d.endpoint + "/api/v1/jobs/" + uuid + "/log"
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := d.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, &commonhttp.Error{
Code: resp.StatusCode,
Message: string(data),
}
}
return data, nil
}
//TODO: builder, default client, etc.

View File

@ -9,4 +9,6 @@ const (
ImageReplicationDelete = "IMAGE_REPLICATION_DELETE"
// ImagePeriodReplication : the name of period replication job in job service
ImagePeriodReplication = "IMAGE_PERIOD_REPLICATION"
// GenericKind marks the job as a generic job, it will be contained in job metadata and passed to job service.
GenericKind = "Generic"
)

View File

@ -0,0 +1,83 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package models
//Parameters for job execution.
type Parameters map[string]interface{}
//JobRequest is the request of launching a job.
type JobRequest struct {
Job *JobData `json:"job"`
}
//JobData keeps the basic info.
type JobData struct {
Name string `json:"name"`
Parameters Parameters `json:"parameters"`
Metadata *JobMetadata `json:"metadata"`
StatusHook string `json:"status_hook"`
}
//JobMetadata stores the metadata of job.
type JobMetadata struct {
JobKind string `json:"kind"`
ScheduleDelay uint64 `json:"schedule_delay,omitempty"`
Cron string `json:"cron_spec,omitempty"`
IsUnique bool `json:"unique"`
}
//JobStats keeps the result of job launching.
type JobStats struct {
Stats *JobStatData `json:"job"`
}
//JobStatData keeps the stats of job
type JobStatData struct {
JobID string `json:"id"`
Status string `json:"status"`
JobName string `json:"name"`
JobKind string `json:"kind"`
IsUnique bool `json:"unique"`
RefLink string `json:"ref_link,omitempty"`
CronSpec string `json:"cron_spec,omitempty"`
EnqueueTime int64 `json:"enqueue_time"`
UpdateTime int64 `json:"update_time"`
RunAt int64 `json:"run_at,omitempty"`
CheckIn string `json:"check_in,omitempty"`
CheckInAt int64 `json:"check_in_at,omitempty"`
DieAt int64 `json:"die_at,omitempty"`
HookStatus string `json:"hook_status,omitempty"`
}
//JobPoolStats represents the healthy and status of all the running worker pools.
type JobPoolStats struct {
Pools []*JobPoolStatsData `json:"worker_pools"`
}
//JobPoolStatsData represent the healthy and status of the worker pool.
type JobPoolStatsData struct {
WorkerPoolID string `json:"worker_pool_id"`
StartedAt int64 `json:"started_at"`
HeartbeatAt int64 `json:"heartbeat_at"`
JobNames []string `json:"job_names"`
Concurrency uint `json:"concurrency"`
Status string `json:"status"`
}
//JobActionRequest defines for triggering job action like stop/cancel.
type JobActionRequest struct {
Action string `json:"action"`
}
//JobStatusChange is designed for reporting the status change via hook.
type JobStatusChange struct {
JobID string `json:"job_id"`
Status string `json:"status"`
CheckIn string `json:"check_in,omitempty"`
}
//Message is designed for sub/pub messages
type Message struct {
Event string
Data interface{} //generic format
}

View File

@ -5,6 +5,7 @@ type ScanJobParms struct {
JobID int64 `json:"job_int_id"`
Repository string `json:"repository"`
Tag string `json:"tag"`
Digest string `json:digest"`
Secret string `json:"job_service_secret"`
RegistryURL string `json:"registry_url"`
ClairEndpoint string `json:"clair_endpoint"`

View File

@ -61,6 +61,7 @@ type RepJob struct {
Operation string `orm:"column(operation)" json:"operation"`
Tags string `orm:"column(tags)" json:"-"`
TagList []string `orm:"-" json:"tags"`
UUID string `orm:"column(job_uuid)" json:"-"`
// Policy RepPolicy `orm:"-" json:"policy"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`

View File

@ -29,6 +29,7 @@ type ScanJob struct {
Repository string `orm:"column(repository)" json:"repository"`
Tag string `orm:"column(tag)" json:"tag"`
Digest string `orm:"column(digest)" json:"digest"`
UUID string `orm:"column(job_uuid)" json:"-"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}

View File

@ -28,7 +28,7 @@ const (
jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH"
jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL"
jobServiceLoggerArchivePeriod = "JOB_SERVICE_LOGGER_ARCHIVE_PERIOD"
jobServiceAdminServerEndpoint = "JOB_SERVICE_ADMIN_SERVER_ENDPOINT"
jobServiceAdminServerEndpoint = "ADMINSERVER_URL"
jobServiceAuthSecret = "JOBSERVICE_SECRET"
//JobServiceProtocolHTTPS points to the 'https' protocol

View File

@ -64,7 +64,7 @@ func (cj *ClairJob) Run(ctx env.JobContext, params map[string]interface{}) error
if err != nil {
return err
}
imgDigest, _, payload, err := repoClient.PullManifest(jobParms.Tag, []string{schema2.MediaTypeManifest})
_, _, payload, err := repoClient.PullManifest(jobParms.Tag, []string{schema2.MediaTypeManifest})
if err != nil {
logger.Errorf("Error pulling manifest for image %s:%s :%v", jobParms.Repository, jobParms.Tag, err)
return err
@ -96,7 +96,7 @@ func (cj *ClairJob) Run(ctx env.JobContext, params map[string]interface{}) error
return err
}
compOverview, sev := clair.TransformVuln(res)
err = dao.UpdateImgScanOverview(imgDigest, layerName, sev, compOverview)
err = dao.UpdateImgScanOverview(jobParms.Digest, layerName, sev, compOverview)
return err
}

178
src/ui/utils/job.go Normal file
View File

@ -0,0 +1,178 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package utils contains methods to support security, cache, and webhook functions.
package utils
import (
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/job"
jobmodels "github.com/vmware/harbor/src/common/job/models"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/common/utils/registry"
"github.com/vmware/harbor/src/ui/config"
"encoding/json"
"sync"
)
var (
cl sync.Mutex
jobServiceClient job.Client
)
// ScanAllImages scans all images of Harbor by submiting jobs to jobservice, the whole process will move on if failed to submit any job of a single image.
func ScanAllImages() error {
repos, err := dao.GetRepositories()
if err != nil {
log.Errorf("Failed to list all repositories, error: %v", err)
return err
}
log.Infof("Scanning all images on Harbor.")
go scanRepos(repos)
return nil
}
// ScanImagesByProjectID scans all images under a projet, the whole process will move on if failed to submit any job of a single image.
func ScanImagesByProjectID(id int64) error {
repos, err := dao.GetRepositories(&models.RepositoryQuery{
ProjectIDs: []int64{id},
})
if err != nil {
log.Errorf("Failed list repositories in project %d, error: %v", id, err)
return err
}
log.Infof("Scanning all images in project: %d ", id)
go scanRepos(repos)
return nil
}
func scanRepos(repos []*models.RepoRecord) {
var repoClient *registry.Repository
var err error
var tags []string
for _, r := range repos {
repoClient, err = NewRepositoryClientForUI("harbor-ui", r.Name)
if err != nil {
log.Errorf("Failed to initialize client for repository: %s, error: %v, skip scanning", r.Name, err)
continue
}
tags, err = repoClient.ListTag()
if err != nil {
log.Errorf("Failed to get tags for repository: %s, error: %v, skip scanning.", r.Name, err)
continue
}
for _, t := range tags {
if err = TriggerImageScan(r.Name, t); err != nil {
log.Errorf("Failed to scan image with repository: %s, tag: %s, error: %v.", r.Name, t, err)
} else {
log.Debugf("Triggered scan for image with repository: %s, tag: %s", r.Name, t)
}
}
}
}
func GetJobServiceClient() job.Client {
cl.Lock()
defer cl.Unlock()
if jobServiceClient == nil {
jobServiceClient = job.NewDefaultClient(config.InternalJobServiceURL(), config.UISecret())
}
return jobServiceClient
}
// TriggerImageScan triggers an image scan job on jobservice.
func TriggerImageScan(repository string, tag string) error {
repoClient, err := NewRepositoryClientForUI("harbor-ui", repository)
if err != nil {
return err
}
digest, _, err := repoClient.ManifestExist(tag)
if err != nil {
log.Errorf("Failed to get Manifest for %s:%s", repository, tag)
return err
}
return triggerImageScan(repository, tag, digest, GetJobServiceClient())
}
func triggerImageScan(repository, tag, digest string, client job.Client) error {
id, err := dao.AddScanJob(models.ScanJob{
Repository: repository,
Digest: digest,
Tag: tag,
Status: models.JobPending,
})
if err != nil {
return err
}
err = dao.SetScanJobForImg(digest, id)
if err != nil {
return err
}
data, err := buildScanJobData(id, repository, tag, digest)
if err != nil {
return err
}
uuid, err := client.SubmitJob(data)
if err != nil {
return err
}
err = dao.SetScanJobUUID(id, uuid)
if err != nil {
log.Warningf("Failed to set UUID for scan job, ID: %d, repository: %s, tag: %s")
}
return nil
}
func buildScanJobData(jobID int64, repository, tag, digest string) (*jobmodels.JobData, error) {
regURL, err := config.RegistryURL()
if err != nil {
return nil, err
}
// TODO:job service can get some parms from context.
parms := job.ScanJobParms{
ClairEndpoint: config.ClairEndpoint(),
JobID: jobID,
RegistryURL: regURL,
Repository: repository,
Secret: config.JobserviceSecret(),
Digest: digest,
Tag: tag,
TokenEndpoint: config.InternalTokenServiceEndpoint(),
}
parmsMap := make(map[string]interface{})
b, err := json.Marshal(parms)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, &parmsMap)
if err != nil {
return nil, err
}
meta := jobmodels.JobMetadata{
JobKind: job.GenericKind,
IsUnique: false,
}
data := &jobmodels.JobData{
Name: job.ImageScanJob,
Parameters: jobmodels.Parameters(parmsMap),
Metadata: &meta,
StatusHook: "",
}
return data, nil
}

View File

@ -16,73 +16,16 @@
package utils
import (
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/common/utils/registry"
"github.com/vmware/harbor/src/common/utils/registry/auth"
"github.com/vmware/harbor/src/ui/config"
"github.com/vmware/harbor/src/ui/service/token"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
)
// ScanAllImages scans all images of Harbor by submiting jobs to jobservice, the whole process will move on if failed to submit any job of a single image.
func ScanAllImages() error {
repos, err := dao.GetRepositories()
if err != nil {
log.Errorf("Failed to list all repositories, error: %v", err)
return err
}
log.Infof("Scanning all images on Harbor.")
go scanRepos(repos)
return nil
}
// ScanImagesByProjectID scans all images under a projet, the whole process will move on if failed to submit any job of a single image.
func ScanImagesByProjectID(id int64) error {
repos, err := dao.GetRepositories(&models.RepositoryQuery{
ProjectIDs: []int64{id},
})
if err != nil {
log.Errorf("Failed list repositories in project %d, error: %v", id, err)
return err
}
log.Infof("Scanning all images in project: %d ", id)
go scanRepos(repos)
return nil
}
func scanRepos(repos []*models.RepoRecord) {
var repoClient *registry.Repository
var err error
var tags []string
for _, r := range repos {
repoClient, err = NewRepositoryClientForUI("harbor-ui", r.Name)
if err != nil {
log.Errorf("Failed to initialize client for repository: %s, error: %v, skip scanning", r.Name, err)
continue
}
tags, err = repoClient.ListTag()
if err != nil {
log.Errorf("Failed to get tags for repository: %s, error: %v, skip scanning.", r.Name, err)
continue
}
for _, t := range tags {
if err = TriggerImageScan(r.Name, t); err != nil {
log.Errorf("Failed to scan image with repository: %s, tag: %s, error: %v.", r.Name, t, err)
} else {
log.Debugf("Triggered scan for image with repository: %s, tag: %s", r.Name, t)
}
}
}
}
// RequestAsUI is a shortcut to make a request attach UI secret and send the request.
// Do not use this when you want to handle the response
func RequestAsUI(method, url string, body io.Reader, h ResponseHandler) error {
@ -110,20 +53,6 @@ func AddUISecret(req *http.Request) {
}
}
// TriggerImageScan triggers an image scan job on jobservice.
func TriggerImageScan(repository string, tag string) error {
data := &models.ImageScanReq{
Repo: repository,
Tag: tag,
}
b, err := json.Marshal(&data)
if err != nil {
return err
}
url := fmt.Sprintf("%s/api/jobs/scan", config.InternalJobServiceURL())
return RequestAsUI("POST", url, bytes.NewBuffer(b), NewStatusRespHandler(http.StatusOK))
}
// NewRepositoryClientForUI creates a repository client that can only be used to
// access the internal registry
func NewRepositoryClientForUI(username, repository string) (*registry.Repository, error) {