Merge pull request #12338 from steven-zou/feat/p2p_preheat_job

feat(preheat):add preheat job
This commit is contained in:
Steven Zou 2020-06-29 12:27:05 +08:00 committed by GitHub
commit 8e2c334b43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 614 additions and 127 deletions

242
src/pkg/p2p/preheat/job.go Normal file
View File

@ -0,0 +1,242 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package preheat
import (
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
pr "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
)
const (
// parameter keeps the preheating provider instance info.
preheatParamProvider = "provider"
// parameter keeps the preheating artifact (image) info.
preheatParamImage = "image"
// checkInterval indicates the interval of loop check.
checkInterval = 10 * time.Second
// checkTimeout indicates the overall timeout of the loop check.
checkTimeout = 1801 * time.Second
)
// Job preheats the given artifact(image) to the target preheat provider.
type Job struct{}
// MaxFails of preheat job. Don't need to retry.
func (j *Job) MaxFails() uint {
return 1
}
// MaxCurrency indicates no limitation to the concurrency of preheat job.
func (j *Job) MaxCurrency() uint {
return 0
}
// ShouldRetry indicates no need to retry preheat job as it's just for a cache purpose.
func (j *Job) ShouldRetry() bool {
return false
}
// Validate the parameters of preheat job.
func (j *Job) Validate(params job.Parameters) error {
_, err := parseParamProvider(params)
if err != nil {
return err
}
_, err = parseParamImage(params)
return err
}
// Run the preheat process.
func (j *Job) Run(ctx job.Context, params job.Parameters) error {
// Get logger
myLogger := ctx.GetLogger()
// Parse parameters, ignore errors as they have been validated already
p, _ := parseParamProvider(params)
pi, _ := parseParamImage(params)
// Print related info to log first
myLogger.Infof(
"Preheating image '%s:%s' to the target preheat provider: %s %s:%s",
pi.ImageName,
pi.Tag,
p.Vendor,
p.Name,
p.Endpoint,
)
// Get driver factory for the given provider
fac, ok := pr.GetProvider(p.Vendor)
if !ok {
err := errors.Errorf("No driver registered for provider %s", p.Vendor)
myLogger.Error(err)
return preheatJobRunningError(err)
}
// Construct driver
d, err := fac(p)
if err != nil {
myLogger.Error(err)
return preheatJobRunningError(err)
}
myLogger.Infof("Get preheat provider driver: %s", p.Vendor)
// Start the preheat process
// First, check the health of the provider
h, err := d.GetHealth()
if err != nil {
myLogger.Error(err)
return preheatJobRunningError(err)
}
if h.Status != pr.DriverStatusHealthy {
err = errors.Errorf("unhealthy target preheat provider: %s", p.Vendor)
myLogger.Error(err)
return preheatJobRunningError(err)
}
myLogger.Infof("Check health of preheat provider instance: %s", pr.DriverStatusHealthy)
// Then send the preheat requests to the target provider.
st, err := d.Preheat(pi)
if err != nil {
myLogger.Error(err)
return preheatJobRunningError(err)
}
myLogger.Info("Sending preheat request is successfully done")
// For some of the drivers, e.g: Kraken, the returned status of preheating request contains the
// final status info. No need to loop check the status.
switch st.Status {
case provider.PreheatingStatusSuccess:
myLogger.Info("Preheating is completed")
return nil
case provider.PreheatingStatusFail:
err = errors.New("preheating is failed")
myLogger.Error(err)
return preheatJobRunningError(err)
case provider.PreheatingStatusPending,
provider.PreheatingStatusRunning:
// do nothing
default:
// in case
err = errors.Errorf("unknown status '%s' returned by the preheat provider %s-%s:%s", st.Status, p.Vendor, p.Name, p.Endpoint)
myLogger.Error(err)
return preheatJobRunningError(err)
}
myLogger.Info("Start to loop check the preheating status until it's ready or timeout(30m)")
// If process is not completed, loop check the status until it's ready.
tk := time.NewTicker(checkInterval)
defer tk.Stop()
tm := time.NewTimer(checkTimeout)
defer tm.Stop()
for {
select {
case <-tk.C:
s, err := d.CheckProgress(st.TaskID)
if err != nil {
myLogger.Error(err)
return preheatJobRunningError(err)
}
// Finished
if s.Status == provider.PreheatingStatusSuccess {
myLogger.Info("Preheating is completed")
return nil
}
case <-tm.C:
return preheatJobRunningError(errors.Errorf("status check timeout: %v", checkTimeout))
}
}
}
// preheatJobRunningError is an internal error format
func preheatJobRunningError(err error) error {
return errors.Wrap(err, "preheat job running error")
}
// parseParamProvider parses the provider param.
func parseParamProvider(params job.Parameters) (*provider.Instance, error) {
data, err := parseStrValue(params, preheatParamProvider)
if err != nil {
return nil, err
}
ins := &provider.Instance{}
if err := ins.FromJSON(data); err != nil {
return nil, errors.Wrap(err, "parse job parameter error")
}
// Validate required info
if len(ins.Vendor) == 0 {
return nil, errors.New("missing vendor of preheat provider")
}
if ins.AuthMode != auth.AuthModeNone && len(ins.AuthInfo) == 0 {
return nil, errors.Errorf("missing auth info for '%s' auth mode", ins.AuthMode)
}
if len(ins.Endpoint) == 0 {
return nil, errors.Errorf("missing endpoint of preheat provider")
}
return ins, nil
}
// parseParamImage parses the preheating image param.
func parseParamImage(params job.Parameters) (*pr.PreheatImage, error) {
data, err := parseStrValue(params, preheatParamImage)
if err != nil {
return nil, err
}
img := &pr.PreheatImage{}
if err := img.FromJSON(data); err != nil {
return nil, errors.Wrap(err, "parse job parameter error")
}
if err := img.Validate(); err != nil {
return nil, errors.Wrap(err, "parse job parameter error")
}
return img, nil
}
// parseStrValue parses the string data of the given parameter key from the job parameters.
func parseStrValue(params job.Parameters, key string) (string, error) {
param, ok := params[key]
if !ok || param == nil {
return "", errors.Errorf("missing job parameter '%s'", key)
}
data, ok := param.(string)
if !ok || len(data) == 0 {
return "", errors.Errorf("bad job parameter '%s'", key)
}
return data, nil
}

View File

@ -0,0 +1,115 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package preheat
import (
"net/http/httptest"
"testing"
"github.com/goharbor/harbor/src/jobservice/job"
p "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
"github.com/goharbor/harbor/src/testing/jobservice"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
// JobTestSuite is test suite of preheating job.
type JobTestSuite struct {
suite.Suite
dragonfly *httptest.Server
kraken *httptest.Server
context job.Context
preheatingImage *provider.PreheatImage
}
// TestJob is the entry method of JobTestSuite
func TestJob(t *testing.T) {
suite.Run(t, &JobTestSuite{})
}
// SetupSuite prepares the env for JobTestSuite.
func (suite *JobTestSuite) SetupSuite() {
suite.dragonfly = provider.MockDragonflyProvider()
suite.dragonfly.StartTLS()
suite.kraken = provider.MockKrakenProvider()
suite.kraken.StartTLS()
suite.preheatingImage = &provider.PreheatImage{
Type: "image",
ImageName: "busybox",
Tag: "latest",
URL: "https://harbor.com",
Headers: map[string]interface{}{
"robot$my": "jwt-token",
},
}
ctx := &jobservice.MockJobContext{}
logger := &jobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger)
suite.context = ctx
}
// TearDownSuite clears the env for JobTestSuite.
func (suite *JobTestSuite) TearDownSuite() {
suite.dragonfly.Close()
suite.kraken.Close()
}
// TestJobWithDragonflyDriver test preheat job running with Dragonfly driver.
func (suite *JobTestSuite) TestJobWithDragonflyDriver() {
ins := &p.Instance{
ID: 1,
Name: "test-instance",
Vendor: provider.DriverDragonfly,
Endpoint: suite.dragonfly.URL,
AuthMode: auth.AuthModeNone,
Enabled: true,
Default: true,
Insecure: true,
Status: provider.DriverStatusHealthy,
}
suite.runJob(ins)
}
func (suite *JobTestSuite) validateJob(j job.Interface, params job.Parameters) {
require.Equal(suite.T(), uint(1), j.MaxFails(), "max fails")
require.Equal(suite.T(), false, j.ShouldRetry(), "should retry")
require.Equal(suite.T(), uint(0), j.MaxCurrency(), "max concurrency")
require.NoError(suite.T(), j.Validate(params), "validate job parameters")
}
func (suite *JobTestSuite) runJob(ins *p.Instance) {
params := make(job.Parameters)
data, err := ins.ToJSON()
require.NoError(suite.T(), err, "encode parameter", preheatParamProvider)
params[preheatParamProvider] = data
data, err = suite.preheatingImage.ToJSON()
require.NoError(suite.T(), err, "encode parameter", preheatParamImage)
params[preheatParamImage] = data
j := &Job{}
suite.validateJob(j, params)
err = j.Run(suite.context, params)
suite.NoError(err, "run preheating job with driver %s", ins.Vendor)
}

View File

@ -14,6 +14,12 @@
package provider
import (
"encoding/json"
"github.com/goharbor/harbor/src/lib/errors"
)
const (
// PreheatingImageTypeImage defines the 'image' type of preheating images
PreheatingImageTypeImage = "image"
@ -46,3 +52,26 @@ type Instance struct {
Insecure bool `orm:"column(insecure)" json:"insecure"`
SetupTimestamp int64 `orm:"column(setup_timestamp)" json:"setup_timestamp"`
}
// FromJSON build instance from the given data.
func (ins *Instance) FromJSON(data string) error {
if len(data) == 0 {
return errors.New("empty JSON data")
}
if err := json.Unmarshal([]byte(data), ins); err != nil {
return errors.Wrap(err, "construct preheat instance error")
}
return nil
}
// ToJSON encodes the instance to JSON data.
func (ins *Instance) ToJSON() (string, error) {
data, err := json.Marshal(ins)
if err != nil {
return "", errors.Wrap(err, "encode preheat instance error")
}
return string(data), nil
}

View File

@ -15,13 +15,8 @@
package provider
import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
@ -44,62 +39,7 @@ func TestDragonfly(t *testing.T) {
// SetupSuite prepares the env for DragonflyTestSuite.
func (suite *DragonflyTestSuite) SetupSuite() {
suite.dragonfly = httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.RequestURI {
case healthCheckEndpoint:
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
w.WriteHeader(http.StatusOK)
case preheatEndpoint:
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusNotImplemented)
return
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
image := &PreheatImage{}
if err := json.Unmarshal(data, image); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if image.Type == "image" &&
image.URL == "https://harbor.com" &&
image.ImageName == "busybox" &&
image.Tag == "latest" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"ID":"dragonfly-id"}`))
return
}
w.WriteHeader(http.StatusBadRequest)
case strings.Replace(preheatTaskEndpoint, "{task_id}", "dragonfly-id", 1):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
status := &dragonflyPreheatInfo{
ID: "dragonfly-id",
StartTime: time.Now().UTC().String(),
FinishTime: time.Now().Add(5 * time.Minute).UTC().String(),
Status: "SUCCESS",
}
bytes, _ := json.Marshal(status)
_, _ = w.Write(bytes)
default:
w.WriteHeader(http.StatusNotImplemented)
}
}))
suite.dragonfly = MockDragonflyProvider()
suite.dragonfly.StartTLS()

View File

@ -49,24 +49,6 @@ type DriverStatus struct {
Status string `json:"status"`
}
// PreheatImage contains related information which can help providers to get/pull the images.
type PreheatImage struct {
// The image content type, only support 'image' now
Type string `json:"type"`
// The access URL of the preheating image
URL string `json:"url"`
// The headers which will be sent to the above URL of preheating image
Headers map[string]interface{} `json:"headers"`
// The image name
ImageName string `json:"image,omitempty"`
// The tag
Tag string `json:"tag,omitempty"`
}
// PreheatingStatus contains the related results/status of the preheating operation
// from the provider.
type PreheatingStatus struct {

View File

@ -33,8 +33,8 @@ func ListProviders() ([]*Metadata, error) {
// Sort results
if len(results) > 1 {
sort.SliceIsSorted(results, func(i, j int) bool {
return strings.Compare(results[i].ID, results[j].ID) > 0
sort.SliceStable(results, func(i, j int) bool {
return strings.Compare(results[i].ID, results[j].ID) < 0
})
}

View File

@ -15,13 +15,9 @@
package provider
import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
cm "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
"github.com/stretchr/testify/require"
@ -43,48 +39,7 @@ func TestKraken(t *testing.T) {
// SetupSuite prepares the env for KrakenTestSuite.
func (suite *KrakenTestSuite) SetupSuite() {
suite.kraken = httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.RequestURI {
case krakenHealthPath:
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
w.WriteHeader(http.StatusOK)
case krakenPreheatPath:
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusNotImplemented)
return
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
var payload = &cm.Notification{
Events: []cm.Event{},
}
if err := json.Unmarshal(data, payload); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if len(payload.Events) > 0 {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusBadRequest)
default:
w.WriteHeader(http.StatusNotImplemented)
}
}))
suite.kraken = MockKrakenProvider()
suite.kraken.StartTLS()

View File

@ -0,0 +1,134 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package provider
import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"time"
cm "github.com/goharbor/harbor/src/common/models"
)
// This is a package to provide mock utilities.
// MockDragonflyProvider mocks a Dragonfly server.
func MockDragonflyProvider() *httptest.Server {
return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.RequestURI {
case healthCheckEndpoint:
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
w.WriteHeader(http.StatusOK)
case preheatEndpoint:
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusNotImplemented)
return
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
image := &PreheatImage{}
if err := json.Unmarshal(data, image); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if image.Type == "image" &&
image.URL == "https://harbor.com" &&
image.ImageName == "busybox" &&
image.Tag == "latest" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"ID":"dragonfly-id"}`))
return
}
w.WriteHeader(http.StatusBadRequest)
case strings.Replace(preheatTaskEndpoint, "{task_id}", "dragonfly-id", 1):
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
status := &dragonflyPreheatInfo{
ID: "dragonfly-id",
StartTime: time.Now().UTC().String(),
FinishTime: time.Now().Add(5 * time.Minute).UTC().String(),
Status: "SUCCESS",
}
bytes, _ := json.Marshal(status)
_, _ = w.Write(bytes)
default:
w.WriteHeader(http.StatusNotImplemented)
}
}))
}
// MockKrakenProvider mocks a Kraken server.
func MockKrakenProvider() *httptest.Server {
return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.RequestURI {
case krakenHealthPath:
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotImplemented)
return
}
w.WriteHeader(http.StatusOK)
case krakenPreheatPath:
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusNotImplemented)
return
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
var payload = &cm.Notification{
Events: []cm.Event{},
}
if err := json.Unmarshal(data, payload); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if len(payload.Events) > 0 {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusBadRequest)
default:
w.WriteHeader(http.StatusNotImplemented)
}
}))
}

View File

@ -0,0 +1,90 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package provider
import (
"encoding/json"
"net/url"
"github.com/pkg/errors"
)
const (
// SupportedType indicates the supported preheating type 'image'.
SupportedType = "image"
)
// PreheatImage contains related information which can help providers to get/pull the images.
type PreheatImage struct {
// The image content type, only support 'image' now
Type string `json:"type"`
// The access URL of the preheating image
URL string `json:"url"`
// The headers which will be sent to the above URL of preheating image
Headers map[string]interface{} `json:"headers"`
// The image name
ImageName string `json:"image,omitempty"`
// The tag
Tag string `json:"tag,omitempty"`
}
// FromJSON build preheating image from the given data.
func (img *PreheatImage) FromJSON(data string) error {
if len(data) == 0 {
return errors.New("empty JSON data")
}
if err := json.Unmarshal([]byte(data), img); err != nil {
return errors.Wrap(err, "construct preheating image error")
}
return nil
}
// ToJSON encodes the preheating image to JSON data.
func (img *PreheatImage) ToJSON() (string, error) {
data, err := json.Marshal(img)
if err != nil {
return "", errors.Wrap(err, "encode preheating image error")
}
return string(data), nil
}
// Validate PreheatImage
func (img *PreheatImage) Validate() error {
if img.Type != SupportedType {
return errors.Errorf("unsupported type '%s'", img.Type)
}
if len(img.ImageName) == 0 || len(img.Tag) == 0 {
return errors.New("missing image repository or tag")
}
if len(img.Headers) == 0 {
return errors.New("missing required headers")
}
_, err := url.Parse(img.URL)
if err != nil {
return errors.Wrap(err, "malformed registry URL")
}
return nil
}