Refactor: p2p preheat dragonfly driver (#20922)

This commit is contained in:
Chlins Zhang 2024-09-21 11:05:01 +08:00 committed by GitHub
parent 8d52a63311
commit c97253f660
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 483 additions and 363 deletions

View File

@ -7095,6 +7095,9 @@ definitions:
type: boolean
description: Whether the preheat policy enabled
x-omitempty: false
scope:
type: string
description: The scope of preheat policy
creation_time:
type: string
format: date-time

View File

@ -3,3 +3,5 @@ Add new column creator_ref and creator_type for robot table to record the creato
*/
ALTER TABLE robot ADD COLUMN IF NOT EXISTS creator_ref integer default 0;
ALTER TABLE robot ADD COLUMN IF NOT EXISTS creator_type varchar(255);
ALTER TABLE p2p_preheat_policy ADD COLUMN IF NOT EXISTS scope varchar(255);

View File

@ -402,7 +402,7 @@ func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*s
// Start tasks
count := 0
for _, c := range candidates {
if _, err = de.startTask(ctx, eid, c, insData); err != nil {
if _, err = de.startTask(ctx, eid, c, insData, pl.Scope); err != nil {
// Just log the error and skip
log.Errorf("start task error for preheating image: %s/%s:%s@%s", c.Namespace, c.Repository, c.Tags[0], c.Digest)
continue
@ -421,7 +421,7 @@ func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*s
}
// startTask starts the preheat task(job) for the given candidate
func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, candidate *selector.Candidate, instance string) (int64, error) {
func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, candidate *selector.Candidate, instance, scope string) (int64, error) {
u, err := de.fullURLGetter(candidate)
if err != nil {
return -1, err
@ -441,6 +441,7 @@ func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, can
ImageName: fmt.Sprintf("%s/%s", candidate.Namespace, candidate.Repository),
Tag: candidate.Tags[0],
Digest: candidate.Digest,
Scope: scope,
}
piData, err := pi.ToJSON()

View File

@ -210,6 +210,7 @@ func mockPolicies() []*po.Schema {
Type: po.TriggerTypeManual,
},
Enabled: true,
Scope: "single_peer",
CreatedAt: time.Now().UTC(),
UpdatedTime: time.Now().UTC(),
}, {
@ -235,6 +236,7 @@ func mockPolicies() []*po.Schema {
Trigger: &po.Trigger{
Type: po.TriggerTypeEventBased,
},
Scope: "all_peers",
Enabled: true,
CreatedAt: time.Now().UTC(),
UpdatedTime: time.Now().UTC(),

View File

@ -16,6 +16,7 @@ package instance
import (
"context"
"encoding/json"
"github.com/goharbor/harbor/src/lib/q"
dao "github.com/goharbor/harbor/src/pkg/p2p/preheat/dao/instance"
@ -114,7 +115,18 @@ func (dm *manager) Update(ctx context.Context, inst *provider.Instance, props ..
// Get implements @Manager.Get
func (dm *manager) Get(ctx context.Context, id int64) (*provider.Instance, error) {
return dm.dao.Get(ctx, id)
ins, err := dm.dao.Get(ctx, id)
if err != nil {
return nil, err
}
// mapping auth data to auth info.
if len(ins.AuthData) > 0 {
if err := json.Unmarshal([]byte(ins.AuthData), &ins.AuthInfo); err != nil {
return nil, err
}
}
return ins, nil
}
// Get implements @Manager.GetByName

View File

@ -191,6 +191,11 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
return preheatJobRunningError(errors.Errorf("preheat failed: %s", s))
case provider.PreheatingStatusSuccess:
// Finished
// log the message if received message from provider.
if s.Message != "" {
myLogger.Infof("Preheat job finished, message from provider: \n%s", s.Message)
}
return nil
default:
// do nothing, check again

View File

@ -16,12 +16,10 @@ package policy
import (
"encoding/json"
"fmt"
"strconv"
"time"
beego_orm "github.com/beego/beego/v2/client/orm"
"github.com/beego/beego/v2/core/validation"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/lib/errors"
@ -32,6 +30,9 @@ func init() {
beego_orm.RegisterModel(&Schema{})
}
// ScopeType represents the preheat scope type.
type ScopeType = string
const (
// Filters:
// Repository : type=Repository value=name text (double star pattern used)
@ -57,6 +58,11 @@ const (
TriggerTypeScheduled TriggerType = "scheduled"
// TriggerTypeEventBased represents the event_based trigger type
TriggerTypeEventBased TriggerType = "event_based"
// ScopeTypeSinglePeer represents preheat image to single peer in p2p cluster.
ScopeTypeSinglePeer ScopeType = "single_peer"
// ScopeTypeAllPeers represents preheat image to all peers in p2p cluster.
ScopeTypeAllPeers ScopeType = "all_peers"
)
// Schema defines p2p preheat policy schema
@ -74,6 +80,8 @@ type Schema struct {
// Use JSON data format (query by trigger type should be supported)
TriggerStr string `orm:"column(trigger)" json:"-"`
Enabled bool `orm:"column(enabled)" json:"enabled"`
// Scope decides the preheat scope.
Scope string `orm:"column(scope)" json:"scope"`
CreatedAt time.Time `orm:"column(creation_time)" json:"creation_time"`
UpdatedTime time.Time `orm:"column(update_time)" json:"update_time"`
}
@ -127,67 +135,15 @@ func (s *Schema) ValidatePreheatPolicy() error {
WithMessage("invalid cron string for scheduled preheat: %s, error: %v", s.Trigger.Settings.Cron, err)
}
}
// validate preheat scope
if s.Scope != "" && s.Scope != ScopeTypeSinglePeer && s.Scope != ScopeTypeAllPeers {
return errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid scope for preheat policy: %s", s.Scope)
}
return nil
}
// Valid the policy
func (s *Schema) Valid(v *validation.Validation) {
if len(s.Name) == 0 {
_ = v.SetError("name", "cannot be empty")
}
// valid the filters
for _, filter := range s.Filters {
switch filter.Type {
case FilterTypeRepository, FilterTypeTag, FilterTypeVulnerability:
_, ok := filter.Value.(string)
if !ok {
_ = v.SetError("filters", "the type of filter value isn't string")
break
}
case FilterTypeSignature:
_, ok := filter.Value.(bool)
if !ok {
_ = v.SetError("filers", "the type of signature filter value isn't bool")
break
}
case FilterTypeLabel:
labels, ok := filter.Value.([]interface{})
if !ok {
_ = v.SetError("filters", "the type of label filter value isn't string slice")
break
}
for _, label := range labels {
_, ok := label.(string)
if !ok {
_ = v.SetError("filters", "the type of label filter value isn't string slice")
break
}
}
default:
_ = v.SetError("filters", "invalid filter type")
}
}
// valid trigger
if s.Trigger != nil {
switch s.Trigger.Type {
case TriggerTypeManual, TriggerTypeEventBased:
case TriggerTypeScheduled:
if len(s.Trigger.Settings.Cron) == 0 {
_ = v.SetError("trigger", fmt.Sprintf("the cron string cannot be empty when the trigger type is %s", TriggerTypeScheduled))
} else {
_, err := utils.CronParser().Parse(s.Trigger.Settings.Cron)
if err != nil {
_ = v.SetError("trigger", fmt.Sprintf("invalid cron string for scheduled trigger: %s", s.Trigger.Settings.Cron))
}
}
default:
_ = v.SetError("trigger", "invalid trigger type")
}
}
}
// Encode encodes policy schema.
func (s *Schema) Encode() error {
if s.Filters != nil {

View File

@ -17,8 +17,6 @@ package policy
import (
"testing"
"github.com/beego/beego/v2/core/validation"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
@ -66,92 +64,13 @@ func (p *PolicyTestSuite) TestValidatePreheatPolicy() {
// valid cron string
p.schema.Trigger.Settings.Cron = "0 0 0 1 1 *"
p.NoError(p.schema.ValidatePreheatPolicy())
}
// TestValid tests Valid method.
func (p *PolicyTestSuite) TestValid() {
// policy name is empty, should return error
v := &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "no policy name should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "cannot be empty")
// policy with name but with error filter type
p.schema.Name = "policy-test"
p.schema.Filters = []*Filter{
{
Type: "invalid-type",
},
}
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid filter type should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "invalid filter type")
filterCases := [][]*Filter{
{
{
Type: FilterTypeSignature,
Value: "invalid-value",
},
},
{
{
Type: FilterTypeTag,
Value: true,
},
},
{
{
Type: FilterTypeLabel,
Value: "invalid-value",
},
},
}
// with valid filter type but with error value type
for _, filters := range filterCases {
p.schema.Filters = filters
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid filter value type should return one error")
}
// with valid filter but error trigger type
p.schema.Filters = []*Filter{
{
Type: FilterTypeSignature,
Value: true,
},
}
p.schema.Trigger = &Trigger{
Type: "invalid-type",
}
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid trigger type should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "invalid trigger type")
// with valid filter but error trigger value
p.schema.Trigger = &Trigger{
Type: TriggerTypeScheduled,
}
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid trigger value should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "the cron string cannot be empty")
// with invalid cron
p.schema.Trigger.Settings.Cron = "1111111111111"
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid trigger value should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "invalid cron string for scheduled trigger")
// all is well
p.schema.Trigger.Settings.Cron = "0/12 * * * * *"
v = &validation.Validation{}
p.schema.Valid(v)
require.False(p.T(), v.HasErrors(), "should return nil error")
// invalid preheat scope
p.schema.Scope = "invalid scope"
p.Error(p.schema.ValidatePreheatPolicy())
// valid preheat scope
p.schema.Scope = "single_peer"
p.NoError(p.schema.ValidatePreheatPolicy())
}
// TestDecode tests decode.
@ -167,11 +86,14 @@ func (p *PolicyTestSuite) TestDecode() {
Trigger: nil,
TriggerStr: "{\"type\":\"event_based\",\"trigger_setting\":{\"cron\":\"\"}}",
Enabled: false,
Scope: "all_peers",
}
p.NoError(s.Decode())
p.Len(s.Filters, 3)
p.NotNil(s.Trigger)
p.Equal(ScopeTypeAllPeers, s.Scope)
// invalid filter or trigger
s.FiltersStr = ""
s.TriggerStr = "invalid"
@ -210,8 +132,10 @@ func (p *PolicyTestSuite) TestEncode() {
},
TriggerStr: "",
Enabled: false,
Scope: "single_peer",
}
p.NoError(s.Encode())
p.Equal(`[{"type":"repository","value":"**"},{"type":"tag","value":"**"},{"type":"label","value":"test"}]`, s.FiltersStr)
p.Equal(`{"type":"event_based","trigger_setting":{}}`, s.TriggerStr)
p.Equal(ScopeTypeSinglePeer, s.Scope)
}

View File

@ -15,37 +15,139 @@
package provider
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/client"
"github.com/olekukonko/tablewriter"
)
const (
healthCheckEndpoint = "/_ping"
preheatEndpoint = "/preheats"
preheatTaskEndpoint = "/preheats/{task_id}"
dragonflyPending = "WAITING"
dragonflyFailed = "FAILED"
// dragonflyHealthPath is the health check path for dragonfly openapi.
dragonflyHealthPath = "/healthy"
// dragonflyJobPath is the job path for dragonfly openapi.
dragonflyJobPath = "/oapi/v1/jobs"
)
type dragonflyPreheatCreateResp struct {
ID string `json:"ID"`
const (
// dragonflyJobPendingState is the pending state of the job, which means
// the job is waiting to be processed and running.
dragonflyJobPendingState = "PENDING"
// dragonflyJobSuccessState is the success state of the job, which means
// the job is processed successfully.
dragonflyJobSuccessState = "SUCCESS"
// dragonflyJobFailureState is the failure state of the job, which means
// the job is processed failed.
dragonflyJobFailureState = "FAILURE"
)
type dragonflyCreateJobRequest struct {
// Type is the job type, support preheat.
Type string `json:"type" binding:"required"`
// Args is the preheating args.
Args dragonflyCreateJobRequestArgs `json:"args" binding:"omitempty"`
// SchedulerClusterIDs is the scheduler cluster ids for preheating.
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
}
type dragonflyPreheatInfo struct {
ID string `json:"ID"`
StartTime string `json:"startTime,omitempty"`
FinishTime string `json:"finishTime,omitempty"`
ErrorMsg string `json:"errorMsg"`
Status string
type dragonflyCreateJobRequestArgs struct {
// Type is the preheating type, support image and file.
Type string `json:"type"`
// URL is the image url for preheating.
URL string `json:"url"`
// Tag is the tag for preheating.
Tag string `json:"tag"`
// FilteredQueryParams is the filtered query params for preheating.
FilteredQueryParams string `json:"filtered_query_params"`
// Headers is the http headers for authentication.
Headers map[string]string `json:"headers"`
// Scope is the scope for preheating, default is single_peer.
Scope string `json:"scope"`
// BatchSize is the batch size for preheating all peers, default is 50.
ConcurrentCount int64 `json:"concurrent_count"`
// Timeout is the timeout for preheating, default is 30 minutes.
Timeout time.Duration `json:"timeout"`
}
type dragonflyJobResponse struct {
// ID is the job id.
ID int `json:"id"`
// CreatedAt is the job created time.
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is the job updated time.
UpdatedAt time.Time `json:"updated_at"`
// State is the job state, support PENDING, SUCCESS, FAILURE.
State string `json:"state"`
// Results is the job results.
Result struct {
// JobStates is the job states, including each job state.
JobStates []struct {
// Error is the job error message.
Error string `json:"error"`
// Results is the job results.
Results []struct {
// SuccessTasks is the success tasks.
SuccessTasks []*struct {
// URL is the url of the task, which is the blob url.
URL string `json:"url"`
// Hostname is the hostname of the task.
Hostname string `json:"hostname"`
// IP is the ip of the task.
IP string `json:"ip"`
} `json:"success_tasks"`
// FailureTasks is the failure tasks.
FailureTasks []*struct {
// URL is the url of the task, which is the blob url.
URL string `json:"url"`
// Hostname is the hostname of the task.
Hostname string `json:"hostname"`
// IP is the ip of the task.
IP string `json:"ip"`
// Description is the failure description.
Description string `json:"description"`
} `json:"failure_tasks"`
// SchedulerClusterID is the scheduler cluster id.
SchedulerClusterID uint `json:"scheduler_cluster_id"`
} `json:"results"`
} `json:"job_states"`
} `json:"result"`
}
// DragonflyDriver implements the provider driver interface for Alibaba dragonfly.
@ -59,10 +161,10 @@ func (dd *DragonflyDriver) Self() *Metadata {
return &Metadata{
ID: "dragonfly",
Name: "Dragonfly",
Icon: "https://raw.githubusercontent.com/alibaba/Dragonfly/master/docs/images/logo.png",
Version: "0.10.1",
Source: "https://github.com/alibaba/Dragonfly",
Maintainers: []string{"Jin Zhang/taiyun.zj@alibaba-inc.com"},
Icon: "https://raw.githubusercontent.com/dragonflyoss/Dragonfly2/master/docs/images/logo/dragonfly-linear.png",
Version: "2.1.57",
Source: "https://github.com/dragonflyoss/Dragonfly2",
Maintainers: []string{"chlins.zhang@gmail.com", "gaius.qi@gmail.com"},
}
}
@ -72,13 +174,13 @@ func (dd *DragonflyDriver) GetHealth() (*DriverStatus, error) {
return nil, errors.New("missing instance metadata")
}
url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), healthCheckEndpoint)
url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), dragonflyHealthPath)
url, err := lib.ValidateHTTPURL(url)
if err != nil {
return nil, err
}
_, err = client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil)
if err != nil {
if _, err = client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil); err != nil {
// Unhealthy
return nil, err
}
@ -99,97 +201,112 @@ func (dd *DragonflyDriver) Preheat(preheatingImage *PreheatImage) (*PreheatingSt
return nil, errors.New("no image specified")
}
taskStatus := provider.PreheatingStatusPending // default
url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), preheatEndpoint)
bytes, err := client.GetHTTPClient(dd.instance.Insecure).Post(url, dd.getCred(), preheatingImage, nil)
if err != nil {
if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusAlreadyReported {
// If the resource was preheated already with empty task ID, we should set preheat status to success.
// Otherwise later querying for the task
taskStatus = provider.PreheatingStatusSuccess
} else {
return nil, err
}
// Construct the preheat job request by the given parameters of the preheating image .
req := &dragonflyCreateJobRequest{
Type: "preheat",
// TODO: Support set SchedulerClusterIDs, FilteredQueryParam, ConcurrentCount and Timeout.
Args: dragonflyCreateJobRequestArgs{
Type: preheatingImage.Type,
URL: preheatingImage.URL,
Headers: headerToMapString(preheatingImage.Headers),
Scope: preheatingImage.Scope,
},
}
result := &dragonflyPreheatCreateResp{}
if err := json.Unmarshal(bytes, result); err != nil {
url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), dragonflyJobPath)
data, err := client.GetHTTPClient(dd.instance.Insecure).Post(url, dd.getCred(), req, nil)
if err != nil {
return nil, err
}
resp := &dragonflyJobResponse{}
if err := json.Unmarshal(data, resp); err != nil {
return nil, err
}
return &PreheatingStatus{
TaskID: result.ID,
Status: taskStatus,
TaskID: fmt.Sprintf("%d", resp.ID),
Status: provider.PreheatingStatusPending,
StartTime: resp.CreatedAt.Format(time.RFC3339),
FinishTime: resp.UpdatedAt.Format(time.RFC3339),
}, nil
}
// CheckProgress implements @Driver.CheckProgress.
func (dd *DragonflyDriver) CheckProgress(taskID string) (*PreheatingStatus, error) {
status, err := dd.getProgressStatus(taskID)
if err != nil {
return nil, err
}
// If preheat job already exists
if strings.Contains(status.ErrorMsg, "preheat task already exists, id:") {
if taskID, err = getTaskExistedFromErrMsg(status.ErrorMsg); err != nil {
return nil, err
}
if status, err = dd.getProgressStatus(taskID); err != nil {
return nil, err
}
}
if status.Status == dragonflyPending {
status.Status = provider.PreheatingStatusPending
} else if status.Status == dragonflyFailed {
status.Status = provider.PreheatingStatusFail
}
res := &PreheatingStatus{
Status: status.Status,
TaskID: taskID,
}
if status.StartTime != "" {
res.StartTime = status.StartTime
}
if status.FinishTime != "" {
res.FinishTime = status.FinishTime
}
return res, nil
}
func getTaskExistedFromErrMsg(msg string) (string, error) {
begin := strings.Index(msg, "preheat task already exists, id:") + 32
end := strings.LastIndex(msg, "\"}")
if end-begin <= 0 {
return "", errors.Errorf("can't find existed task id by error msg:%s", msg)
}
return msg[begin:end], nil
}
func (dd *DragonflyDriver) getProgressStatus(taskID string) (*dragonflyPreheatInfo, error) {
if dd.instance == nil {
return nil, errors.New("missing instance metadata")
}
if len(taskID) == 0 {
if taskID == "" {
return nil, errors.New("no task ID")
}
path := strings.Replace(preheatTaskEndpoint, "{task_id}", taskID, 1)
url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), path)
bytes, err := client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil)
url := fmt.Sprintf("%s%s/%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), dragonflyJobPath, taskID)
data, err := client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil)
if err != nil {
return nil, err
}
status := &dragonflyPreheatInfo{}
if err := json.Unmarshal(bytes, status); err != nil {
resp := &dragonflyJobResponse{}
if err := json.Unmarshal(data, resp); err != nil {
return nil, err
}
return status, nil
var (
successMessage string
errorMessage string
)
var state string
switch resp.State {
case dragonflyJobPendingState:
state = provider.PreheatingStatusRunning
case dragonflyJobSuccessState:
state = provider.PreheatingStatusSuccess
var buffer bytes.Buffer
table := tablewriter.NewWriter(&buffer)
table.SetHeader([]string{"Blob URL", "Hostname", "IP", "Cluster ID", "State", "Error Message"})
for _, jobState := range resp.Result.JobStates {
for _, result := range jobState.Results {
// Write the success tasks records to the table.
for _, successTask := range result.SuccessTasks {
table.Append([]string{successTask.URL, successTask.Hostname, successTask.IP, fmt.Sprint(result.SchedulerClusterID), dragonflyJobSuccessState, ""})
}
// Write the failure tasks records to the table.
for _, failureTask := range result.FailureTasks {
table.Append([]string{failureTask.URL, failureTask.Hostname, failureTask.IP, fmt.Sprint(result.SchedulerClusterID), dragonflyJobFailureState, failureTask.Description})
}
}
}
table.Render()
successMessage = buffer.String()
case dragonflyJobFailureState:
var errs errors.Errors
state = provider.PreheatingStatusFail
for _, jobState := range resp.Result.JobStates {
errs = append(errs, errors.New(jobState.Error))
}
if len(errs) > 0 {
errorMessage = errs.Error()
}
default:
state = provider.PreheatingStatusFail
errorMessage = fmt.Sprintf("unknown state: %s", resp.State)
}
return &PreheatingStatus{
TaskID: fmt.Sprintf("%d", resp.ID),
Status: state,
Message: successMessage,
Error: errorMessage,
StartTime: resp.CreatedAt.Format(time.RFC3339),
FinishTime: resp.UpdatedAt.Format(time.RFC3339),
}, nil
}
func (dd *DragonflyDriver) getCred() *auth.Credential {
@ -198,3 +315,14 @@ func (dd *DragonflyDriver) getCred() *auth.Credential {
Data: dd.instance.AuthInfo,
}
}
func headerToMapString(header map[string]interface{}) map[string]string {
m := make(map[string]string)
for k, v := range header {
if s, ok := v.(string); ok {
m[k] = s
}
}
return m
}

View File

@ -79,64 +79,41 @@ func (suite *DragonflyTestSuite) TestGetHealth() {
// TestPreheat tests Preheat method.
func (suite *DragonflyTestSuite) TestPreheat() {
// preheat first time
st, err := suite.driver.Preheat(&PreheatImage{
Type: "image",
ImageName: "busybox",
Tag: "latest",
URL: "https://harbor.com",
Digest: "sha256:f3c97e3bd1e27393eb853a5c90b1132f2cda84336d5ba5d100c720dc98524c82",
Scope: "single_peer",
})
require.NoError(suite.T(), err, "preheat image")
suite.Equal("dragonfly-id", st.TaskID, "preheat image result")
// preheat the same image second time
st, err = suite.driver.Preheat(&PreheatImage{
Type: "image",
ImageName: "busybox",
Tag: "latest",
URL: "https://harbor.com",
Digest: "sha256:f3c97e3bd1e27393eb853a5c90b1132f2cda84336d5ba5d100c720dc98524c82",
})
require.NoError(suite.T(), err, "preheat image")
suite.Equal("", st.TaskID, "preheat image result")
// preheat image digest is empty
st, err = suite.driver.Preheat(&PreheatImage{
ImageName: "",
})
require.Error(suite.T(), err, "preheat image")
suite.Equal(provider.PreheatingStatusPending, st.Status, "preheat status")
suite.Equal("0", st.TaskID, "task id")
suite.NotEmptyf(st.StartTime, "start time")
suite.NotEmptyf(st.FinishTime, "finish time")
}
// TestCheckProgress tests CheckProgress method.
func (suite *DragonflyTestSuite) TestCheckProgress() {
st, err := suite.driver.CheckProgress("dragonfly-id")
require.NoError(suite.T(), err, "get preheat status")
st, err := suite.driver.CheckProgress("1")
require.NoError(suite.T(), err, "get image")
suite.Equal(provider.PreheatingStatusRunning, st.Status, "preheat status")
suite.Equal("1", st.TaskID, "task id")
suite.NotEmptyf(st.StartTime, "start time")
suite.NotEmptyf(st.FinishTime, "finish time")
st, err = suite.driver.CheckProgress("2")
require.NoError(suite.T(), err, "get image")
suite.Equal(provider.PreheatingStatusSuccess, st.Status, "preheat status")
suite.Equal("2", st.TaskID, "task id")
suite.NotEmptyf(st.StartTime, "start time")
suite.NotEmptyf(st.FinishTime, "finish time")
// preheat job exit but returns no id
st, err = suite.driver.CheckProgress("preheat-job-exist-with-no-id")
require.Error(suite.T(), err, "get preheat status")
// preheat job exit returns id but get info with that failed
st, err = suite.driver.CheckProgress("preheat-job-exist-with-id-1")
require.Error(suite.T(), err, "get preheat status")
// preheat job normal failed
st, err = suite.driver.CheckProgress("preheat-job-normal-failed")
require.NoError(suite.T(), err, "get preheat status")
st, err = suite.driver.CheckProgress("3")
require.NoError(suite.T(), err, "get image")
suite.Equal(provider.PreheatingStatusFail, st.Status, "preheat status")
// instance is empty
testDriver := &DragonflyDriver{}
st, err = testDriver.CheckProgress("")
require.Error(suite.T(), err, "get preheat status")
// preheat job with no task id
st, err = suite.driver.CheckProgress("")
require.Error(suite.T(), err, "get preheat status")
// preheat job with err json response
st, err = suite.driver.CheckProgress("preheat-job-err-body-json")
require.Error(suite.T(), err, "get preheat status")
suite.Equal("3", st.TaskID, "task id")
suite.NotEmptyf(st.StartTime, "start time")
suite.NotEmptyf(st.FinishTime, "finish time")
}

View File

@ -77,6 +77,7 @@ type DriverStatus struct {
type PreheatingStatus struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
StartTime string `json:"start_time"`
FinishTime string `json:"finish_time"`

View File

@ -16,10 +16,10 @@ package provider
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"time"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/notification"
@ -32,126 +32,146 @@ var preheatMap = make(map[string]struct{})
func MockDragonflyProvider() *httptest.Server {
return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.RequestURI {
case healthCheckEndpoint:
case dragonflyHealthPath:
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
w.WriteHeader(http.StatusOK)
case preheatEndpoint:
case dragonflyJobPath:
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusNotImplemented)
return
}
data, err := io.ReadAll(r.Body)
var resp = &dragonflyJobResponse{
ID: 0,
State: dragonflyJobPendingState,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
bytes, err := json.Marshal(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
image := &PreheatImage{}
if err := json.Unmarshal(data, image); err != nil {
if _, err := w.Write(bytes); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if image.ImageName == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
if _, ok := preheatMap[image.Digest]; ok {
w.WriteHeader(http.StatusAlreadyReported)
_, _ = w.Write([]byte(`{"ID":""}`))
return
}
preheatMap[image.Digest] = struct{}{}
if image.Type == "image" &&
image.URL == "https://harbor.com" &&
image.ImageName == "busybox" &&
image.Tag == "latest" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"ID":"dragonfly-id"}`))
case fmt.Sprintf("%s/%s", dragonflyJobPath, "0"):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
w.WriteHeader(http.StatusBadRequest)
case strings.Replace(preheatTaskEndpoint, "{task_id}", "dragonfly-id", 1):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
status := &dragonflyPreheatInfo{
ID: "dragonfly-id",
StartTime: time.Now().UTC().String(),
FinishTime: time.Now().Add(5 * time.Minute).UTC().String(),
Status: "SUCCESS",
}
bytes, _ := json.Marshal(status)
_, _ = w.Write(bytes)
case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-exist-with-no-id", 1):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
status := &dragonflyPreheatInfo{
ID: "preheat-exist-with-no-id",
StartTime: time.Now().UTC().String(),
FinishTime: time.Now().Add(5 * time.Minute).UTC().String(),
Status: "FAILED",
ErrorMsg: "{\"Code\":208,\"Msg\":\"preheat task already exists, id:\"}",
}
bytes, _ := json.Marshal(status)
_, _ = w.Write(bytes)
case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-normal-failed", 1):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
status := &dragonflyPreheatInfo{
ID: "preheat-job-exist-with-id-1",
StartTime: time.Now().UTC().String(),
FinishTime: time.Now().Add(5 * time.Minute).UTC().String(),
Status: "FAILED",
ErrorMsg: "{\"Code\":208,\"Msg\":\"some msg\"}",
}
bytes, _ := json.Marshal(status)
_, _ = w.Write(bytes)
case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-exist-with-id-1", 1):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
status := &dragonflyPreheatInfo{
ID: "preheat-job-exist-with-id-1",
StartTime: time.Now().UTC().String(),
FinishTime: time.Now().Add(5 * time.Minute).UTC().String(),
Status: "FAILED",
ErrorMsg: "{\"Code\":208,\"Msg\":\"preheat task already exists, id:preheat-job-exist-with-id-1-1\"}",
}
bytes, _ := json.Marshal(status)
_, _ = w.Write(bytes)
case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-exist-with-id-1-1", 1):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
var resp = &dragonflyJobResponse{
ID: 1,
State: dragonflyJobSuccessState,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
bytes, err := json.Marshal(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-err-body-json", 1):
_, _ = w.Write([]byte(err.Error()))
return
}
if _, err := w.Write(bytes); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
case fmt.Sprintf("%s/%s", dragonflyJobPath, "1"):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
bodyStr := "\"err body\""
_, _ = w.Write([]byte(bodyStr))
default:
var resp = &dragonflyJobResponse{
ID: 1,
State: dragonflyJobPendingState,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
bytes, err := json.Marshal(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if _, err := w.Write(bytes); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
case fmt.Sprintf("%s/%s", dragonflyJobPath, "2"):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
var resp = &dragonflyJobResponse{
ID: 2,
State: dragonflyJobSuccessState,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
bytes, err := json.Marshal(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if _, err := w.Write(bytes); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
case fmt.Sprintf("%s/%s", dragonflyJobPath, "3"):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
var resp = &dragonflyJobResponse{
ID: 3,
State: dragonflyJobFailureState,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
bytes, err := json.Marshal(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if _, err := w.Write(bytes); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
}
}))
}

View File

@ -45,6 +45,9 @@ type PreheatImage struct {
// Digest of the preheating image
Digest string `json:"digest"`
// Scope indicates the preheat scope.
Scope string `json:"scope,omitempty"`
}
// FromJSON build preheating image from the given data.

View File

@ -457,6 +457,30 @@
(inputvalue)="setCron($event)"></cron-selection>
</div>
</div>
<!-- scope -->
<div class="clr-form-control">
<clr-select-container>
<label class="clr-control-label width-6rem">
{{ 'P2P_PROVIDER.SCOPE' | translate }}
</label>
<select
class="width-380"
[disabled]="loading"
clrSelect
name="scope"
id="scope"
[(ngModel)]="scope"
#ngScope="ngModel">
<option class="display-none" value=""></option>
<option
[selected]="policy.scope === item"
*ngFor="let item of scopes"
value="{{ item }}">
{{ getScopeI18n(item) | translate }}
</option>
</select>
</clr-select-container>
</div>
</section>
</form>
<div class="mt-1 bottom-btn" *ngIf="!isEdit">

View File

@ -29,6 +29,8 @@ import {
PROJECT_SEVERITY_LEVEL_MAP,
TRIGGER,
TRIGGER_I18N_MAP,
SCOPE,
SCOPE_I18N_MAP,
} from '../p2p-provider.service';
import { ProviderUnderProject } from '../../../../../../ng-swagger-gen/models/provider-under-project';
import { AppConfigService } from '../../../../services/app-config.service';
@ -73,6 +75,7 @@ export class AddP2pPolicyComponent implements OnInit, OnDestroy {
severity: number;
labels: string;
triggerType: string = TRIGGER.MANUAL;
scope: string = SCOPE.SINGLE_PEER;
cron: string;
@ViewChild('policyForm', { static: true }) currentForm: NgForm;
loading: boolean = false;
@ -96,6 +99,7 @@ export class AddP2pPolicyComponent implements OnInit, OnDestroy {
TRIGGER.SCHEDULED,
TRIGGER.EVENT_BASED,
];
scopes: string[] = [SCOPE.SINGLE_PEER, SCOPE.ALL_PEERS];
enableContentTrust: boolean = false;
private _nameSubject: Subject<string> = new Subject<string>();
private _nameSubscription: Subscription;
@ -198,6 +202,7 @@ export class AddP2pPolicyComponent implements OnInit, OnDestroy {
}
this.currentForm.reset({
triggerType: 'manual',
scope: 'single_peer',
severity: PROJECT_SEVERITY_LEVEL_MAP[this.projectSeverity],
onlySignedImages: this.enableContentTrust,
provider: this.policy.provider_id,
@ -303,6 +308,7 @@ export class AddP2pPolicyComponent implements OnInit, OnDestroy {
policy.trigger = JSON.stringify(trigger);
this.loading = true;
this.buttonStatus = ClrLoadingState.LOADING;
policy.scope = this.scope ? this.scope : SCOPE.SINGLE_PEER;
deleteEmptyKey(policy);
if (isAdd) {
policy.project_id = this.projectId;
@ -404,6 +410,10 @@ export class AddP2pPolicyComponent implements OnInit, OnDestroy {
return true;
}
// eslint-disable-next-line eqeqeq
if (this.policy.scope != this.scope) {
return true;
}
// eslint-disable-next-line eqeqeq
return this.originCronForEdit != this.cron;
}
isSystemAdmin(): boolean {
@ -417,6 +427,14 @@ export class AddP2pPolicyComponent implements OnInit, OnDestroy {
}
return '';
}
getScopeI18n(scope): string {
if (scope) {
return SCOPE_I18N_MAP[scope];
}
return '';
}
showCron(): boolean {
if (this.triggerType) {
return this.triggerType === TRIGGER.SCHEDULED;

View File

@ -77,6 +77,16 @@ export const TRIGGER_I18N_MAP = {
'scheduled(paused)': 'JOB_SERVICE_DASHBOARD.SCHEDULE_PAUSED',
};
export enum SCOPE {
SINGLE_PEER = 'single_peer',
ALL_PEERS = 'all_peers',
}
export const SCOPE_I18N_MAP = {
single_peer: 'P2P_PROVIDER.SCOPE_SINGLE_PEER',
all_peers: 'P2P_PROVIDER.SCOPE_ALL_PEERS',
};
export const TIME_OUT: number = 7000;
export const PROJECT_SEVERITY_LEVEL_MAP = {

View File

@ -490,6 +490,7 @@ export class PolicyComponent implements OnInit, OnDestroy {
severity: this.addP2pPolicyComponent.severity,
label: this.addP2pPolicyComponent.labels,
triggerType: this.addP2pPolicyComponent.triggerType,
scope: this.addP2pPolicyComponent.scope,
});
this.addP2pPolicyComponent.originPolicyForEdit = clone(
this.selectedRow

View File

@ -1625,6 +1625,9 @@
"TRIGGER": "Trigger",
"CREATED": "Erzeugt am",
"DESCRIPTION": "Beschreibung",
"SCOPE": "Umfang",
"SCOPE_SINGLE_PEER": "Einzelner Peer",
"SCOPE_ALL_PEERS": "Alle Peers",
"NO_POLICY": "Keine Regelwerke",
"ENABLED_POLICY_SUMMARY": "Soll das Regelwerk {{name}} aktiviert werden?",
"DISABLED_POLICY_SUMMARY": "Soll das Regelwerk {{name}} deaktiviert werden?",

View File

@ -1628,6 +1628,9 @@
"TRIGGER": "Trigger",
"CREATED": "Creation Time",
"DESCRIPTION": "Description",
"SCOPE": "Scope",
"SCOPE_SINGLE_PEER": "Single Peer",
"SCOPE_ALL_PEERS": "All Peers",
"NO_POLICY": "No policy",
"ENABLED_POLICY_SUMMARY": "Do you want to enable policy {{name}}?",
"DISABLED_POLICY_SUMMARY": "Do you want to deactivate policy {{name}}?",

View File

@ -1622,6 +1622,9 @@
"TRIGGER": "Trigger",
"CREATED": "Creation Time",
"DESCRIPTION": "Description",
"SCOPE": "Scope",
"SCOPE_SINGLE_PEER": "Single Peer",
"SCOPE_ALL_PEERS": "All Peers",
"NO_POLICY": "No policy",
"ENABLED_POLICY_SUMMARY": "Do you want to enable policy {{name}}?",
"DISABLED_POLICY_SUMMARY": "Do you want to disable policy {{name}}?",

View File

@ -1625,6 +1625,9 @@
"TRIGGER": "Déclencheur",
"CREATED": "Date/Heure de création",
"DESCRIPTION": "Description",
"SCOPE": "Champ d'application",
"SCOPE_SINGLE_PEER": "Pair unique",
"SCOPE_ALL_PEERS": "Tous les pairs",
"NO_POLICY": "Aucune stratégie",
"ENABLED_POLICY_SUMMARY": "Voulez-vous activer la stratégie {{name}} ?",
"DISABLED_POLICY_SUMMARY": "Voulez-vous désactiver la stratégie {{name}} ?",

View File

@ -1619,6 +1619,9 @@
"TRIGGER": "트리거",
"CREATED": "생성 시간",
"DESCRIPTION": "설명",
"SCOPE": "범위",
"SCOPE_SINGLE_PEER": "싱글 피어",
"SCOPE_ALL_PEERS": "모든 피어",
"NO_POLICY": "정책 없음",
"ENABLED_POLICY_SUMMARY": "정책{{name}}을 활성화하시겠습니까?",
"DISABLED_POLICY_SUMMARY": "정책{{name}}을 비활성화하시겠습니까?",

View File

@ -1622,6 +1622,9 @@
"TRIGGER": "Disparo",
"CREATED": "Criado em",
"DESCRIPTION": "Descrição",
"SCOPE": "Escopo",
"SCOPE_SINGLE_PEER": "Par único",
"SCOPE_ALL_PEERS": "Todos os pares",
"NO_POLICY": "Nenhuma política",
"ENABLED_POLICY_SUMMARY": "Gostaria de habilitar a política {{name}}?",
"DISABLED_POLICY_SUMMARY": "Gostaria de desabilitar a política {{name}}?",

View File

@ -1625,6 +1625,9 @@
"TRIGGER": "Trigger",
"CREATED": "Creation Time",
"DESCRIPTION": "Description",
"SCOPE": "Scope",
"SCOPE_SINGLE_PEER": "Single Peer",
"SCOPE_ALL_PEERS": "All Peers",
"NO_POLICY": "No policy",
"ENABLED_POLICY_SUMMARY": "Do you want to enable policy {{name}}?",
"DISABLED_POLICY_SUMMARY": "Do you want to disable policy {{name}}?",

View File

@ -1624,6 +1624,9 @@
"TRIGGER": "触发器",
"CREATED": "创建时间",
"DESCRIPTION": "描述",
"SCOPE": "范围",
"SCOPE_SINGLE_PEER": "单节点",
"SCOPE_ALL_PEERS": "全节点",
"NO_POLICY": "暂无记录",
"ENABLED_POLICY_SUMMARY": "是否启用策略 {{name}}?",
"DISABLED_POLICY_SUMMARY": "是否禁用策略 {{name}}?",

View File

@ -1620,6 +1620,9 @@
"TRIGGER": "觸發器",
"CREATED": "建立時間",
"DESCRIPTION": "描述",
"SCOPE": "範圍",
"SCOPE_SINGLE_PEER": "單節點",
"SCOPE_ALL_PEERS": "全節點",
"NO_POLICY": "無原則",
"ENABLED_POLICY_SUMMARY": "您是否要啟用原則 {{name}}",
"DISABLED_POLICY_SUMMARY": "您是否要停用原則 {{name}}",

View File

@ -483,6 +483,7 @@ func convertPolicyToPayload(policy *policy.Schema) (*models.PreheatPolicy, error
ProjectID: policy.ProjectID,
ProviderID: policy.ProviderID,
Trigger: policy.TriggerStr,
Scope: policy.Scope,
UpdateTime: strfmt.DateTime(policy.UpdatedTime),
}, nil
}
@ -511,6 +512,7 @@ func convertParamPolicyToModelPolicy(model *models.PreheatPolicy) (*policy.Schem
FiltersStr: model.Filters,
TriggerStr: model.Trigger,
Enabled: model.Enabled,
Scope: model.Scope,
CreatedAt: time.Time(model.CreationTime),
UpdatedTime: time.Time(model.UpdateTime),
}, nil

View File

@ -39,7 +39,7 @@ func Test_convertProvidersToFrontend(t *testing.T) {
{"",
backend,
[]*models.Metadata{
{ID: "dragonfly", Icon: "https://raw.githubusercontent.com/alibaba/Dragonfly/master/docs/images/logo.png", Maintainers: []string{"Jin Zhang/taiyun.zj@alibaba-inc.com"}, Name: "Dragonfly", Source: "https://github.com/alibaba/Dragonfly", Version: "0.10.1"},
{ID: "dragonfly", Icon: "https://raw.githubusercontent.com/dragonflyoss/Dragonfly2/master/docs/images/logo/dragonfly-linear.png", Maintainers: []string{"chlins.zhang@gmail.com", "gaius.qi@gmail.com"}, Name: "Dragonfly", Source: "https://github.com/dragonflyoss/Dragonfly2", Version: "2.1.57"},
{Icon: "https://github.com/uber/kraken/blob/master/assets/kraken-logo-color.svg", ID: "kraken", Maintainers: []string{"mmpei/peimingming@corp.netease.com"}, Name: "Kraken", Source: "https://github.com/uber/kraken", Version: "0.1.3"},
},
},
@ -79,6 +79,7 @@ func Test_convertPolicyToPayload(t *testing.T) {
Trigger: nil,
TriggerStr: "",
Enabled: false,
Scope: "all_peers",
CreatedAt: time.Time{},
UpdatedTime: time.Time{},
},
@ -92,6 +93,7 @@ func Test_convertPolicyToPayload(t *testing.T) {
ProjectID: 0,
ProviderID: 0,
Trigger: "",
Scope: "all_peers",
UpdateTime: strfmt.DateTime{},
},
},
@ -141,6 +143,7 @@ func Test_convertParamPolicyToModelPolicy(t *testing.T) {
ProjectID: 0,
ProviderID: 0,
Trigger: "",
Scope: "single_peer",
UpdateTime: strfmt.DateTime{},
},
expect: &policy.Schema{
@ -154,6 +157,7 @@ func Test_convertParamPolicyToModelPolicy(t *testing.T) {
Trigger: nil,
TriggerStr: "",
Enabled: false,
Scope: "single_peer",
CreatedAt: time.Time{},
UpdatedTime: time.Time{},
},