mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-23 02:35:17 +01:00
Merge pull request #5957 from reasonerjt/scan-all-jobsvc
Schedule "scan all" via jobservice
This commit is contained in:
commit
ce0e195d18
@ -1,108 +0,0 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/scheduler"
|
||||
"github.com/goharbor/harbor/src/common/scheduler/policy"
|
||||
"github.com/goharbor/harbor/src/common/scheduler/task"
|
||||
)
|
||||
|
||||
const (
|
||||
// PolicyTypeDaily specify the policy type is "daily"
|
||||
PolicyTypeDaily = "daily"
|
||||
|
||||
// PolicyTypeNone specify the policy type is "none"
|
||||
PolicyTypeNone = "none"
|
||||
|
||||
alternatePolicy = "Alternate Policy"
|
||||
)
|
||||
|
||||
// ScanPolicyNotification is defined for pass the policy change data.
|
||||
type ScanPolicyNotification struct {
|
||||
// Type is used to keep the scan policy type: "none","daily" and "refresh".
|
||||
Type string
|
||||
|
||||
// DailyTime is used when the type is 'daily', the offset with UTC time 00:00.
|
||||
DailyTime int64
|
||||
}
|
||||
|
||||
// ScanPolicyNotificationHandler is defined to handle the changes of scanning
|
||||
// policy.
|
||||
type ScanPolicyNotificationHandler struct{}
|
||||
|
||||
// IsStateful to indicate this handler is stateful.
|
||||
func (s *ScanPolicyNotificationHandler) IsStateful() bool {
|
||||
// Policy change should be done one by one.
|
||||
return true
|
||||
}
|
||||
|
||||
// Handle the policy change notification.
|
||||
func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error {
|
||||
if value == nil {
|
||||
return errors.New("ScanPolicyNotificationHandler can not handle nil value")
|
||||
}
|
||||
|
||||
if reflect.TypeOf(value).Kind() != reflect.Struct ||
|
||||
reflect.TypeOf(value).String() != "notifier.ScanPolicyNotification" {
|
||||
return errors.New("ScanPolicyNotificationHandler can not handle value with invalid type")
|
||||
}
|
||||
|
||||
notification := value.(ScanPolicyNotification)
|
||||
|
||||
hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy)
|
||||
if notification.Type == PolicyTypeDaily {
|
||||
if !hasScheduled {
|
||||
// Schedule a new policy.
|
||||
return schedulePolicy(notification)
|
||||
}
|
||||
|
||||
// To check and compare if the related parameter is changed.
|
||||
if pl := scheduler.DefaultScheduler.GetPolicy(alternatePolicy); pl != nil {
|
||||
policyCandidate := policy.NewAlternatePolicy(alternatePolicy, &policy.AlternatePolicyConfiguration{
|
||||
Duration: 24 * time.Hour,
|
||||
OffsetTime: notification.DailyTime,
|
||||
})
|
||||
if !pl.Equal(policyCandidate) {
|
||||
// Parameter changed.
|
||||
// Unschedule policy.
|
||||
if err := scheduler.DefaultScheduler.UnSchedule(alternatePolicy); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Schedule a new policy.
|
||||
return schedulePolicy(notification)
|
||||
}
|
||||
// Same policy configuration, do nothing
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New("Inconsistent policy scheduling status")
|
||||
} else if notification.Type == PolicyTypeNone {
|
||||
if hasScheduled {
|
||||
return scheduler.DefaultScheduler.UnSchedule(alternatePolicy)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("Notification type %s is not supported", notification.Type)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Schedule policy.
|
||||
func schedulePolicy(notification ScanPolicyNotification) error {
|
||||
schedulePolicy := policy.NewAlternatePolicy(alternatePolicy, &policy.AlternatePolicyConfiguration{
|
||||
Duration: 24 * time.Hour,
|
||||
OffsetTime: notification.DailyTime,
|
||||
})
|
||||
attachTask := task.NewScanAllTask()
|
||||
if err := schedulePolicy.AttachTasks(attachTask); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return scheduler.DefaultScheduler.Schedule(schedulePolicy)
|
||||
}
|
@ -1,73 +0,0 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/scheduler"
|
||||
"github.com/goharbor/harbor/src/common/scheduler/policy"
|
||||
)
|
||||
|
||||
var testingScheduler = scheduler.DefaultScheduler
|
||||
|
||||
func TestScanPolicyNotificationHandler(t *testing.T) {
|
||||
// Scheduler should be running.
|
||||
testingScheduler.Start()
|
||||
if !testingScheduler.IsRunning() {
|
||||
t.Fatal("scheduler should be running")
|
||||
}
|
||||
|
||||
handler := &ScanPolicyNotificationHandler{}
|
||||
if !handler.IsStateful() {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
utcTime := time.Now().UTC().Unix()
|
||||
notification := ScanPolicyNotification{"daily", utcTime + 3600}
|
||||
if err := handler.Handle(notification); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !testingScheduler.HasScheduled("Alternate Policy") {
|
||||
t.Fatal("Handler does not work")
|
||||
}
|
||||
|
||||
// Policy parameter changed.
|
||||
notification2 := ScanPolicyNotification{"daily", utcTime + 7200}
|
||||
if err := handler.Handle(notification2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !testingScheduler.HasScheduled("Alternate Policy") {
|
||||
t.Fatal("Handler does not work [2]")
|
||||
}
|
||||
pl := testingScheduler.GetPolicy("Alternate Policy")
|
||||
if pl == nil {
|
||||
t.Fail()
|
||||
}
|
||||
spl := pl.(*policy.AlternatePolicy)
|
||||
cfg := spl.GetConfig()
|
||||
if cfg == nil {
|
||||
t.Fail()
|
||||
}
|
||||
if cfg.OffsetTime != utcTime+7200 {
|
||||
t.Fatal("Policy is not updated")
|
||||
}
|
||||
|
||||
notification3 := ScanPolicyNotification{"none", 0}
|
||||
if err := handler.Handle(notification3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if testingScheduler.HasScheduled("Alternate Policy") {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
// Clear
|
||||
testingScheduler.Stop()
|
||||
// Waiting for everything is ready.
|
||||
<-time.After(1 * time.Second)
|
||||
if testingScheduler.IsRunning() {
|
||||
t.Fatal("scheduler should be stopped")
|
||||
}
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
package replication
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/notifier"
|
||||
"github.com/goharbor/harbor/src/core/notifier"
|
||||
"github.com/goharbor/harbor/src/replication/event/notification"
|
||||
"github.com/goharbor/harbor/src/replication/event/topic"
|
||||
)
|
||||
|
@ -20,9 +20,9 @@ import (
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/notifier"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
api_models "github.com/goharbor/harbor/src/core/api/models"
|
||||
"github.com/goharbor/harbor/src/core/notifier"
|
||||
"github.com/goharbor/harbor/src/replication/core"
|
||||
"github.com/goharbor/harbor/src/replication/event/notification"
|
||||
"github.com/goharbor/harbor/src/replication/event/topic"
|
||||
|
@ -29,7 +29,6 @@ import (
|
||||
"github.com/goharbor/harbor/src/common"
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/notifier"
|
||||
"github.com/goharbor/harbor/src/common/utils"
|
||||
"github.com/goharbor/harbor/src/common/utils/clair"
|
||||
registry_error "github.com/goharbor/harbor/src/common/utils/error"
|
||||
@ -37,6 +36,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/common/utils/notary"
|
||||
"github.com/goharbor/harbor/src/common/utils/registry"
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
"github.com/goharbor/harbor/src/core/notifier"
|
||||
coreutils "github.com/goharbor/harbor/src/core/utils"
|
||||
"github.com/goharbor/harbor/src/replication/event/notification"
|
||||
"github.com/goharbor/harbor/src/replication/event/topic"
|
||||
@ -932,40 +932,22 @@ func (ra *RepositoryAPI) ScanAll() {
|
||||
ra.HandleUnauthorized()
|
||||
return
|
||||
}
|
||||
projectIDStr := ra.GetString("project_id")
|
||||
if len(projectIDStr) > 0 { // scan images under the project only.
|
||||
pid, err := strconv.ParseInt(projectIDStr, 10, 64)
|
||||
if err != nil || pid <= 0 {
|
||||
ra.HandleBadRequest(fmt.Sprintf("Invalid project_id %s", projectIDStr))
|
||||
return
|
||||
}
|
||||
if !ra.SecurityCtx.HasAllPerm(pid) {
|
||||
ra.HandleForbidden(ra.SecurityCtx.GetUsername())
|
||||
return
|
||||
}
|
||||
if err := coreutils.ScanImagesByProjectID(pid); err != nil {
|
||||
log.Errorf("Failed triggering scan images in project: %d, error: %v", pid, err)
|
||||
ra.HandleInternalServerError(fmt.Sprintf("Error: %v", err))
|
||||
return
|
||||
}
|
||||
} else { // scan all images in Harbor
|
||||
if !ra.SecurityCtx.IsSysAdmin() {
|
||||
ra.HandleForbidden(ra.SecurityCtx.GetUsername())
|
||||
return
|
||||
}
|
||||
if !utils.ScanAllMarker().Check() {
|
||||
log.Warningf("There is a scan all scheduled at: %v, the request will not be processed.", utils.ScanAllMarker().Next())
|
||||
ra.RenderError(http.StatusPreconditionFailed, "Unable handle frequent scan all requests")
|
||||
return
|
||||
}
|
||||
|
||||
if err := coreutils.ScanAllImages(); err != nil {
|
||||
log.Errorf("Failed triggering scan all images, error: %v", err)
|
||||
ra.HandleInternalServerError(fmt.Sprintf("Error: %v", err))
|
||||
return
|
||||
}
|
||||
utils.ScanAllMarker().Mark()
|
||||
if !ra.SecurityCtx.IsSysAdmin() {
|
||||
ra.HandleForbidden(ra.SecurityCtx.GetUsername())
|
||||
return
|
||||
}
|
||||
if !utils.ScanAllMarker().Check() {
|
||||
log.Warningf("There is a scan all scheduled at: %v, the request will not be processed.", utils.ScanAllMarker().Next())
|
||||
ra.RenderError(http.StatusPreconditionFailed, "Unable handle frequent scan all requests")
|
||||
return
|
||||
}
|
||||
|
||||
if err := coreutils.ScanAllImages(); err != nil {
|
||||
log.Errorf("Failed triggering scan all images, error: %v", err)
|
||||
ra.HandleInternalServerError(fmt.Sprintf("Error: %v", err))
|
||||
return
|
||||
}
|
||||
utils.ScanAllMarker().Mark()
|
||||
ra.Ctx.ResponseWriter.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/notifier"
|
||||
"github.com/goharbor/harbor/src/common/utils"
|
||||
"github.com/goharbor/harbor/src/common/utils/clair"
|
||||
registry_error "github.com/goharbor/harbor/src/common/utils/error"
|
||||
@ -30,6 +29,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/common/utils/registry"
|
||||
"github.com/goharbor/harbor/src/common/utils/registry/auth"
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
"github.com/goharbor/harbor/src/core/notifier"
|
||||
"github.com/goharbor/harbor/src/core/promgr"
|
||||
"github.com/goharbor/harbor/src/core/service/token"
|
||||
coreutils "github.com/goharbor/harbor/src/core/utils"
|
||||
|
@ -26,8 +26,6 @@ import (
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/notifier"
|
||||
"github.com/goharbor/harbor/src/common/scheduler"
|
||||
"github.com/goharbor/harbor/src/common/utils"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/api"
|
||||
@ -36,6 +34,7 @@ import (
|
||||
_ "github.com/goharbor/harbor/src/core/auth/uaa"
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
"github.com/goharbor/harbor/src/core/filter"
|
||||
"github.com/goharbor/harbor/src/core/notifier"
|
||||
"github.com/goharbor/harbor/src/core/proxy"
|
||||
"github.com/goharbor/harbor/src/core/service/token"
|
||||
"github.com/goharbor/harbor/src/replication/core"
|
||||
@ -110,9 +109,6 @@ func main() {
|
||||
log.Fatalf("Failed to initialize API handlers with error: %s", err.Error())
|
||||
}
|
||||
|
||||
// Enable the policy scheduler here.
|
||||
scheduler.DefaultScheduler.Start()
|
||||
|
||||
// Subscribe the policy change topic.
|
||||
if err = notifier.Subscribe(notifier.ScanAllPolicyTopic, ¬ifier.ScanPolicyNotificationHandler{}); err != nil {
|
||||
log.Errorf("failed to subscribe scan all policy change topic: %v", err)
|
||||
|
@ -5,8 +5,6 @@ import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/scheduler"
|
||||
)
|
||||
|
||||
var statefulData int32
|
||||
@ -182,46 +180,3 @@ func TestConcurrentPublish(t *testing.T) {
|
||||
// Clear stateful data.
|
||||
atomic.StoreInt32(&statefulData, 0)
|
||||
}
|
||||
|
||||
func TestConcurrentPublishWithScanPolicyHandler(t *testing.T) {
|
||||
scheduler.DefaultScheduler.Start()
|
||||
if !scheduler.DefaultScheduler.IsRunning() {
|
||||
t.Fatal("Policy scheduler is not started")
|
||||
}
|
||||
|
||||
count := len(notificationWatcher.handlers)
|
||||
if err := Subscribe("testing_topic", &ScanPolicyNotificationHandler{}); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if len(notificationWatcher.handlers) != (count + 1) {
|
||||
t.Fatalf("Handler is not registered")
|
||||
}
|
||||
|
||||
utcTime := time.Now().UTC().Unix()
|
||||
notification := ScanPolicyNotification{"daily", utcTime + 3600}
|
||||
for i := 1; i <= 10; i++ {
|
||||
notification.DailyTime += (int64)(i)
|
||||
if err := Publish("testing_topic", notification); err != nil {
|
||||
t.Fatalf("index=%d, error=%s", i, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Wating for everything is ready.
|
||||
<-time.After(2 * time.Second)
|
||||
|
||||
if err := UnSubscribe("testing_topic", ""); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
if len(notificationWatcher.handlers) != count {
|
||||
t.Fatal("Handler is not unregistered")
|
||||
}
|
||||
|
||||
scheduler.DefaultScheduler.Stop()
|
||||
// Wating for everything is ready.
|
||||
<-time.After(1 * time.Second)
|
||||
if scheduler.DefaultScheduler.IsRunning() {
|
||||
t.Fatal("Policy scheduler is not stopped")
|
||||
}
|
||||
|
||||
}
|
104
src/core/notifier/scan_policy_notitification_handler.go
Normal file
104
src/core/notifier/scan_policy_notitification_handler.go
Normal file
@ -0,0 +1,104 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
common_http "github.com/goharbor/harbor/src/common/http"
|
||||
"github.com/goharbor/harbor/src/common/job"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
common_utils "github.com/goharbor/harbor/src/common/utils"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
// PolicyTypeDaily specify the policy type is "daily"
|
||||
PolicyTypeDaily = "daily"
|
||||
// PolicyTypeNone specify the policy type is "none"
|
||||
PolicyTypeNone = "none"
|
||||
)
|
||||
|
||||
// ScanPolicyNotification is defined for pass the policy change data.
|
||||
type ScanPolicyNotification struct {
|
||||
// Type is used to keep the scan policy type: "none","daily" and "refresh".
|
||||
Type string
|
||||
|
||||
// DailyTime is used when the type is 'daily', the offset with UTC time 00:00.
|
||||
DailyTime int64
|
||||
}
|
||||
|
||||
// ScanPolicyNotificationHandler is defined to handle the changes of scanning
|
||||
// policy.
|
||||
type ScanPolicyNotificationHandler struct{}
|
||||
|
||||
// IsStateful to indicate this handler is stateful.
|
||||
func (s *ScanPolicyNotificationHandler) IsStateful() bool {
|
||||
// Policy change should be done one by one.
|
||||
return true
|
||||
}
|
||||
|
||||
// Handle the policy change notification.
|
||||
func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error {
|
||||
notification, ok := value.(ScanPolicyNotification)
|
||||
if !ok {
|
||||
return errors.New("ScanPolicyNotificationHandler can not handle value with invalid type")
|
||||
}
|
||||
|
||||
if notification.Type == PolicyTypeDaily {
|
||||
if err := cancelScanAllJobs(); err != nil {
|
||||
return fmt.Errorf("Failed to cancel scan_all jobs, error: %v", err)
|
||||
}
|
||||
h, m, s := common_utils.ParseOfftime(notification.DailyTime)
|
||||
cron := fmt.Sprintf("%d %d %d * * *", s, m, h)
|
||||
if err := utils.ScheduleScanAllImages(cron); err != nil {
|
||||
return fmt.Errorf("Failed to schedule scan_all job, error: %v", err)
|
||||
}
|
||||
} else if notification.Type == PolicyTypeNone {
|
||||
if err := cancelScanAllJobs(); err != nil {
|
||||
return fmt.Errorf("Failed to cancel scan_all jobs, error: %v", err)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("Notification type %s is not supported", notification.Type)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func cancelScanAllJobs(c ...job.Client) error {
|
||||
var client job.Client
|
||||
if c == nil || len(c) == 0 {
|
||||
client = utils.GetJobServiceClient()
|
||||
} else {
|
||||
client = c[0]
|
||||
}
|
||||
q := &models.AdminJobQuery{
|
||||
Name: job.ImageScanAllJob,
|
||||
Kind: job.JobKindPeriodic,
|
||||
}
|
||||
jobs, err := dao.GetAdminJobs(q)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to query sheduled scan_all jobs, error: %v", err)
|
||||
return err
|
||||
}
|
||||
if len(jobs) > 1 {
|
||||
log.Warningf("Got more than one scheduled scan_all jobs: %+v", jobs)
|
||||
}
|
||||
for _, j := range jobs {
|
||||
if err := dao.DeleteAdminJob(j.ID); err != nil {
|
||||
log.Warningf("Failed to delete scan_all job from DB, job ID: %d, job UUID: %s, error: %v", j.ID, j.UUID, err)
|
||||
}
|
||||
if err := client.PostAction(j.UUID, job.JobActionStop); err != nil {
|
||||
if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusNotFound {
|
||||
log.Warningf("scan_all job not found on jobservice, UUID: %s, skip", j.UUID)
|
||||
} else {
|
||||
log.Errorf("Failed to stop scan_all job, UUID: %s, error: %v", j.UUID, e)
|
||||
return e
|
||||
}
|
||||
}
|
||||
log.Infof("scan_all job canceled, uuid: %s, id: %d", j.UUID, j.ID)
|
||||
}
|
||||
return nil
|
||||
}
|
16
src/core/notifier/scan_policy_notitification_handler_test.go
Normal file
16
src/core/notifier/scan_policy_notitification_handler_test.go
Normal file
@ -0,0 +1,16 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestScanPolicyNotificationHandler(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
s := &ScanPolicyNotificationHandler{}
|
||||
assert.True(s.IsStateful())
|
||||
err := s.Handle("")
|
||||
if assert.NotNil(err) {
|
||||
assert.Contains(err.Error(), "invalid type")
|
||||
}
|
||||
}
|
@ -23,11 +23,11 @@ import (
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
clairdao "github.com/goharbor/harbor/src/common/dao/clair"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/notifier"
|
||||
"github.com/goharbor/harbor/src/common/utils"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/api"
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
"github.com/goharbor/harbor/src/core/notifier"
|
||||
coreutils "github.com/goharbor/harbor/src/core/utils"
|
||||
rep_notification "github.com/goharbor/harbor/src/replication/event/notification"
|
||||
"github.com/goharbor/harbor/src/replication/event/topic"
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
jobmodels "github.com/goharbor/harbor/src/common/job/models"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/common/utils/registry"
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
|
||||
"encoding/json"
|
||||
@ -34,56 +33,49 @@ var (
|
||||
jobServiceClient job.Client
|
||||
)
|
||||
|
||||
// ScanAllImages scans all images of Harbor by submiting jobs to jobservice, the whole process will move on if failed to submit any job of a single image.
|
||||
// ScanAllImages scans all images of Harbor by submiting a scan all job to jobservice, and the job handler will call API
|
||||
// on the "core" service
|
||||
func ScanAllImages() error {
|
||||
repos, err := dao.GetRepositories()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to list all repositories, error: %v", err)
|
||||
return err
|
||||
}
|
||||
log.Infof("Scanning all images on Harbor.")
|
||||
|
||||
go scanRepos(repos)
|
||||
return nil
|
||||
_, err := scanAll("")
|
||||
return err
|
||||
}
|
||||
|
||||
// ScanImagesByProjectID scans all images under a projet, the whole process will move on if failed to submit any job of a single image.
|
||||
func ScanImagesByProjectID(id int64) error {
|
||||
repos, err := dao.GetRepositories(&models.RepositoryQuery{
|
||||
ProjectIDs: []int64{id},
|
||||
// ScheduleScanAllImages will schedule a scan all job based on the cron string, add append a record in admin job table.
|
||||
func ScheduleScanAllImages(cron string) error {
|
||||
_, err := scanAll(cron)
|
||||
return err
|
||||
}
|
||||
|
||||
func scanAll(cron string, c ...job.Client) (string, error) {
|
||||
var client job.Client
|
||||
if c == nil || len(c) == 0 {
|
||||
client = GetJobServiceClient()
|
||||
} else {
|
||||
client = c[0]
|
||||
}
|
||||
kind := job.JobKindGeneric
|
||||
if len(cron) > 0 {
|
||||
kind = job.JobKindPeriodic
|
||||
}
|
||||
meta := &jobmodels.JobMetadata{
|
||||
JobKind: kind,
|
||||
IsUnique: true,
|
||||
Cron: cron,
|
||||
}
|
||||
id, err := dao.AddAdminJob(&models.AdminJob{
|
||||
Name: job.ImageScanAllJob,
|
||||
Kind: kind,
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("Failed list repositories in project %d, error: %v", id, err)
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
log.Infof("Scanning all images in project: %d ", id)
|
||||
go scanRepos(repos)
|
||||
return nil
|
||||
}
|
||||
|
||||
func scanRepos(repos []*models.RepoRecord) {
|
||||
var repoClient *registry.Repository
|
||||
var err error
|
||||
var tags []string
|
||||
for _, r := range repos {
|
||||
repoClient, err = NewRepositoryClientForUI("harbor-core", r.Name)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to initialize client for repository: %s, error: %v, skip scanning", r.Name, err)
|
||||
continue
|
||||
}
|
||||
tags, err = repoClient.ListTag()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get tags for repository: %s, error: %v, skip scanning.", r.Name, err)
|
||||
continue
|
||||
}
|
||||
for _, t := range tags {
|
||||
if err = TriggerImageScan(r.Name, t); err != nil {
|
||||
log.Errorf("Failed to scan image with repository: %s, tag: %s, error: %v.", r.Name, t, err)
|
||||
} else {
|
||||
log.Debugf("Triggered scan for image with repository: %s, tag: %s", r.Name, t)
|
||||
}
|
||||
}
|
||||
data := &jobmodels.JobData{
|
||||
Name: job.ImageScanAllJob,
|
||||
Metadata: meta,
|
||||
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/adminjob/%d", config.InternalCoreURL(), id),
|
||||
}
|
||||
log.Infof("scan_all job scheduled/triggered, cron string: '%s'", cron)
|
||||
return client.SubmitJob(data)
|
||||
}
|
||||
|
||||
// GetJobServiceClient returns the job service client instance.
|
||||
@ -134,7 +126,7 @@ func triggerImageScan(repository, tag, digest string, client job.Client) error {
|
||||
}
|
||||
err = dao.SetScanJobUUID(id, uuid)
|
||||
if err != nil {
|
||||
log.Warningf("Failed to set UUID for scan job, ID: %d, repository: %s, tag: %s")
|
||||
log.Warningf("Failed to set UUID for scan job, ID: %d, repository: %s, tag: %s", id, uuid, repository, tag)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
50
src/core/utils/job_test.go
Normal file
50
src/core/utils/job_test.go
Normal file
@ -0,0 +1,50 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/job"
|
||||
jobmodels "github.com/goharbor/harbor/src/common/job/models"
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type jobDataTestEntry struct {
|
||||
input job.ScanJobParms
|
||||
expect jobmodels.JobData
|
||||
}
|
||||
|
||||
func TestBuildScanJobData(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
testData := []jobDataTestEntry{
|
||||
{input: job.ScanJobParms{
|
||||
JobID: 123,
|
||||
Digest: "sha256:abcde",
|
||||
Repository: "library/ubuntu",
|
||||
Tag: "latest",
|
||||
},
|
||||
expect: jobmodels.JobData{
|
||||
Name: job.ImageScanJob,
|
||||
Parameters: map[string]interface{}{
|
||||
"job_int_id": 123,
|
||||
"repository": "library/ubuntu",
|
||||
"tag": "latest",
|
||||
"digest": "sha256:abcde",
|
||||
},
|
||||
Metadata: &jobmodels.JobMetadata{
|
||||
JobKind: job.JobKindGeneric,
|
||||
IsUnique: false,
|
||||
},
|
||||
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/scan/%d", config.InternalCoreURL(), 123),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, d := range testData {
|
||||
r, err := buildScanJobData(d.input.JobID, d.input.Repository, d.input.Tag, d.input.Digest)
|
||||
assert.Nil(err)
|
||||
assert.Equal(d.expect.Name, r.Name)
|
||||
// assert.Equal(d.expect.Parameters, r.Parameters)
|
||||
assert.Equal(d.expect.StatusHook, r.StatusHook)
|
||||
}
|
||||
}
|
@ -14,8 +14,18 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
err := config.Init()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rc := m.Run()
|
||||
os.Exit(rc)
|
||||
|
||||
}
|
||||
|
@ -15,8 +15,8 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/notifier"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/notifier"
|
||||
"github.com/goharbor/harbor/src/replication/event/topic"
|
||||
)
|
||||
|
||||
|
@ -20,9 +20,9 @@ import (
|
||||
"reflect"
|
||||
|
||||
common_models "github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/notifier"
|
||||
"github.com/goharbor/harbor/src/common/utils"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/notifier"
|
||||
"github.com/goharbor/harbor/src/replication"
|
||||
"github.com/goharbor/harbor/src/replication/event/notification"
|
||||
"github.com/goharbor/harbor/src/replication/event/topic"
|
||||
|
56
src/testing/job/mock_client.go
Normal file
56
src/testing/job/mock_client.go
Normal file
@ -0,0 +1,56 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/http"
|
||||
"github.com/goharbor/harbor/src/common/job"
|
||||
"github.com/goharbor/harbor/src/common/job/models"
|
||||
)
|
||||
|
||||
// MockJobClient ...
|
||||
type MockJobClient struct {
|
||||
JobUUID []string
|
||||
}
|
||||
|
||||
// GetJobLog ...
|
||||
func (mjc *MockJobClient) GetJobLog(uuid string) ([]byte, error) {
|
||||
if uuid == "500" {
|
||||
return nil, &http.Error{500, "Server side error"}
|
||||
}
|
||||
if mjc.validUUID(uuid) {
|
||||
return []byte("some log"), nil
|
||||
}
|
||||
return nil, &http.Error{404, "Not Found"}
|
||||
}
|
||||
|
||||
// SubmitJob ...
|
||||
func (mjc *MockJobClient) SubmitJob(data *models.JobData) (string, error) {
|
||||
if data.Name == job.ImageScanAllJob || data.Name == job.ImageReplicate || data.Name == job.ImageGC || data.Name == job.ImageScanJob {
|
||||
uuid := fmt.Sprintf("u-%d", rand.Int())
|
||||
mjc.JobUUID = append(mjc.JobUUID, uuid)
|
||||
return uuid, nil
|
||||
}
|
||||
return "", fmt.Errorf("Unsupported job %s", data.Name)
|
||||
}
|
||||
|
||||
// PostAction ...
|
||||
func (mjc *MockJobClient) PostAction(uuid, action string) error {
|
||||
if "500" == uuid {
|
||||
return &http.Error{500, "Server side error"}
|
||||
}
|
||||
if !mjc.validUUID(uuid) {
|
||||
return &http.Error{404, "Not Found"}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mjc *MockJobClient) validUUID(uuid string) bool {
|
||||
for _, u := range mjc.JobUUID {
|
||||
if uuid == u {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
Loading…
Reference in New Issue
Block a user