Merge pull request #12392 from steven-zou/fix/remove_inst_provider_todo

fix(p2p):remove the provider manager related to
This commit is contained in:
Steven Zou 2020-07-03 23:59:55 +08:00 committed by GitHub
commit 2f2c545a0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 132 additions and 55 deletions

View File

@ -6,9 +6,9 @@ import (
"testing" "testing"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
imocks "github.com/goharbor/harbor/src/pkg/p2p/preheat/instance/mocks"
providerModel "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider" providerModel "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider" "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
"github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/instance"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -18,12 +18,12 @@ type preheatSuite struct {
suite.Suite suite.Suite
ctx context.Context ctx context.Context
controller Controller controller Controller
fackManager *imocks.Manager fackManager *instance.FakeManager
} }
func TestPreheatSuite(t *testing.T) { func TestPreheatSuite(t *testing.T) {
t.Log("Start TestPreheatSuite") t.Log("Start TestPreheatSuite")
fackManager := &imocks.Manager{} fackManager := &instance.FakeManager{}
var c = &controller{ var c = &controller{
iManager: fackManager, iManager: fackManager,
@ -149,7 +149,7 @@ func (s *preheatSuite) TestUpdateInstance() {
} }
func (s *preheatSuite) TestGetInstance() { func (s *preheatSuite) TestGetInstance() {
instance, err := s.controller.GetInstance(s.ctx, 1) inst, err := s.controller.GetInstance(s.ctx, 1)
s.NoError(err) s.NoError(err)
s.NotNil(instance) s.NotNil(inst)
} }

View File

@ -33,6 +33,7 @@ import (
"github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/lib/selector" "github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/pkg/p2p/preheat" "github.com/goharbor/harbor/src/pkg/p2p/preheat"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/instance"
pol "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy" pol "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider" "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/policy" "github.com/goharbor/harbor/src/pkg/p2p/preheat/policy"
@ -112,8 +113,8 @@ type defaultEnforcer struct {
scanCtl scan.Controller scanCtl scan.Controller
// for getting project related info // for getting project related info
proCtl project.Controller proCtl project.Controller
// TODO: Need preheat provider manager // for getting provider instance
// instMgr instance.Manager
// for getting the access endpoint of registry V2 // for getting the access endpoint of registry V2
fullURLGetter extURLGetter fullURLGetter extURLGetter
// for creating the access credential // for creating the access credential
@ -129,6 +130,7 @@ func NewEnforcer() Enforcer {
artCtl: artifact.NewController(), artCtl: artifact.NewController(),
scanCtl: scan.DefaultController, scanCtl: scan.DefaultController,
proCtl: project.NewController(), proCtl: project.NewController(),
instMgr: instance.Mgr,
fullURLGetter: func(c *selector.Candidate) (s string, e error) { fullURLGetter: func(c *selector.Candidate) (s string, e error) {
edp, err := config.ExtEndpoint() edp, err := config.ExtEndpoint()
if err != nil { if err != nil {
@ -172,6 +174,16 @@ func (de *defaultEnforcer) EnforcePolicy(ctx context.Context, policyID int64) (i
return -1, enforceError(errors.Errorf("policy %d:%s is not enabled", pl.ID, pl.Name)) return -1, enforceError(errors.Errorf("policy %d:%s is not enabled", pl.ID, pl.Name))
} }
// Get and check if the provider instance bound with the policy is healthy
inst, err := de.instMgr.Get(ctx, pl.ProviderID)
if err != nil {
return -1, enforceError(err)
}
if err := checkProviderHealthy(inst); err != nil {
return -1, enforceError(err)
}
// Retrieve the initial candidates // Retrieve the initial candidates
candidates, err := de.getCandidates(ctx, pl) candidates, err := de.getCandidates(ctx, pl)
if err != nil { if err != nil {
@ -187,7 +199,7 @@ func (de *defaultEnforcer) EnforcePolicy(ctx context.Context, policyID int64) (i
} }
// Launch execution // Launch execution
eid, err := de.launchExecutions(ctx, filtered, pl) eid, err := de.launchExecutions(ctx, filtered, pl, inst)
if err != nil { if err != nil {
// NOTES: Please pay attention here, even the non-nil error returned, it does not mean // NOTES: Please pay attention here, even the non-nil error returned, it does not mean
// the relevant execution is not available. The execution ID should also be checked(>0) // the relevant execution is not available. The execution ID should also be checked(>0)
@ -237,6 +249,19 @@ func (de *defaultEnforcer) PreheatArtifact(ctx context.Context, art *artifact.Ar
continue continue
} }
// Get and check if the provider instance bound with the policy is healthy
inst, err := de.instMgr.Get(ctx, pl.ProviderID)
if err != nil {
logger.Errorf("Failed to get the preheat provider instance bound with the policy %d:%s with error: %s", pl.ID, pl.Name, err.Error())
continue
}
// Skip unhealthy instance
if err := checkProviderHealthy(inst); err != nil {
logger.Errorf("The preheat provider instance bound with the policy %d:%s is not healthy: %s", pl.ID, pl.Name, err.Error())
continue
}
filtered, err := policy.NewFilter().BuildFrom(pl).Filter(candidates) filtered, err := policy.NewFilter().BuildFrom(pl).Filter(candidates)
if err != nil { if err != nil {
// Log error and continue // Log error and continue
@ -248,7 +273,7 @@ func (de *defaultEnforcer) PreheatArtifact(ctx context.Context, art *artifact.Ar
if len(filtered) > 0 { if len(filtered) > 0 {
// Matched // Matched
eid, err := de.launchExecutions(ctx, filtered, pl) eid, err := de.launchExecutions(ctx, filtered, pl, inst)
if err != nil { if err != nil {
// Log error and continue // Log error and continue
logger.Errorf("Failed to launch execution for policy %d:%s with error: %s", pl.ID, pl.Name, err.Error()) logger.Errorf("Failed to launch execution for policy %d:%s with error: %s", pl.ID, pl.Name, err.Error())
@ -299,7 +324,7 @@ func (de *defaultEnforcer) getCandidates(ctx context.Context, ps *pol.Schema) ([
} }
// launchExecutions create execution record and launch tasks to preheat the filtered artifacts. // launchExecutions create execution record and launch tasks to preheat the filtered artifacts.
func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*selector.Candidate, pl *pol.Schema) (int64, error) { func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*selector.Candidate, pl *pol.Schema, inst *provider.Instance) (int64, error) {
// Create execution first anyway // Create execution first anyway
attrs := map[string]interface{}{ attrs := map[string]interface{}{
extraAttrTotal: len(candidates), extraAttrTotal: len(candidates),
@ -325,10 +350,7 @@ func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*s
return eid, nil return eid, nil
} }
// TODO: Get provider instance by the provider ID insData, err := inst.ToJSON()
// Placeholder
ins := &provider.Instance{}
insData, err := ins.ToJSON()
if err != nil { if err != nil {
// In case // In case
if er := de.executionMgr.MarkError(ctx, eid, err.Error()); er != nil { if er := de.executionMgr.MarkError(ctx, eid, err.Error()); er != nil {
@ -495,3 +517,30 @@ func getLabels(labels []*models.Label) []string {
return lt return lt
} }
// check the health of the given provider instance
func checkProviderHealthy(inst *provider.Instance) error {
// Get driver factory for the given provider
fac, ok := pr.GetProvider(inst.Vendor)
if !ok {
return errors.Errorf("no driver registered for provider %s", inst.Vendor)
}
// Construct driver
d, err := fac(inst)
if err != nil {
return err
}
// Check health
h, err := d.GetHealth()
if err != nil {
return err
}
if h.Status != pr.DriverStatusHealthy {
return errors.Errorf("preheat provider instance %s-%s:%s is not healthy", inst.Vendor, inst.Name, inst.Endpoint)
}
return nil
}

View File

@ -17,6 +17,8 @@ package preheat
import ( import (
"context" "context"
"fmt" "fmt"
"net/http"
"net/http/httptest"
"testing" "testing"
"time" "time"
@ -26,6 +28,9 @@ import (
"github.com/goharbor/harbor/src/lib/selector" "github.com/goharbor/harbor/src/lib/selector"
ar "github.com/goharbor/harbor/src/pkg/artifact" ar "github.com/goharbor/harbor/src/pkg/artifact"
po "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy" po "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy"
pr "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
v1 "github.com/goharbor/harbor/src/pkg/scan/rest/v1" v1 "github.com/goharbor/harbor/src/pkg/scan/rest/v1"
"github.com/goharbor/harbor/src/pkg/scan/vuln" "github.com/goharbor/harbor/src/pkg/scan/vuln"
ta "github.com/goharbor/harbor/src/pkg/tag/model/tag" ta "github.com/goharbor/harbor/src/pkg/tag/model/tag"
@ -33,6 +38,7 @@ import (
"github.com/goharbor/harbor/src/testing/controller/project" "github.com/goharbor/harbor/src/testing/controller/project"
"github.com/goharbor/harbor/src/testing/controller/scan" "github.com/goharbor/harbor/src/testing/controller/scan"
"github.com/goharbor/harbor/src/testing/mock" "github.com/goharbor/harbor/src/testing/mock"
"github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/instance"
"github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/policy" "github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/policy"
"github.com/goharbor/harbor/src/testing/pkg/task" "github.com/goharbor/harbor/src/testing/pkg/task"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -44,6 +50,7 @@ type EnforcerTestSuite struct {
suite.Suite suite.Suite
enforcer *defaultEnforcer enforcer *defaultEnforcer
server *httptest.Server
} }
// TestEnforcer is an entry method of running EnforcerTestSuite // TestEnforcer is an entry method of running EnforcerTestSuite
@ -53,6 +60,12 @@ func TestEnforcer(t *testing.T) {
// SetupSuite prepares env for running EnforcerTestSuite // SetupSuite prepares env for running EnforcerTestSuite
func (suite *EnforcerTestSuite) SetupSuite() { func (suite *EnforcerTestSuite) SetupSuite() {
// Start mock server
suite.server = httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
suite.server.StartTLS()
fakePolicies := mockPolicies() fakePolicies := mockPolicies()
fakePolicyManager := &policy.FakeManager{} fakePolicyManager := &policy.FakeManager{}
fakePolicyManager.On("Get", fakePolicyManager.On("Get",
@ -108,6 +121,20 @@ func (suite *EnforcerTestSuite) SetupSuite() {
CVEAllowlist: models.CVEAllowlist{}, CVEAllowlist: models.CVEAllowlist{},
}, nil) }, nil)
fakeInstanceMgr := &instance.FakeManager{}
fakeInstanceMgr.On("Get",
context.TODO(),
mock.AnythingOfType("int64"),
).Return(&pr.Instance{
ID: 1,
Name: "my_preheat_provider1",
Vendor: provider.DriverKraken,
Endpoint: suite.server.URL,
Status: provider.DriverStatusHealthy,
AuthMode: auth.AuthModeNone,
Insecure: true,
}, nil)
suite.enforcer = &defaultEnforcer{ suite.enforcer = &defaultEnforcer{
policyMgr: fakePolicyManager, policyMgr: fakePolicyManager,
executionMgr: fakeExecManager, executionMgr: fakeExecManager,
@ -115,6 +142,7 @@ func (suite *EnforcerTestSuite) SetupSuite() {
artCtl: fakeArtCtl, artCtl: fakeArtCtl,
scanCtl: fakeScanCtl, scanCtl: fakeScanCtl,
proCtl: fakeProCtl, proCtl: fakeProCtl,
instMgr: fakeInstanceMgr,
fullURLGetter: func(c *selector.Candidate) (s string, e error) { fullURLGetter: func(c *selector.Candidate) (s string, e error) {
r := fmt.Sprintf("%s/%s", c.Namespace, c.Repository) r := fmt.Sprintf("%s/%s", c.Namespace, c.Repository)
return fmt.Sprintf(manifestAPIPattern, "https://testing.harbor.com", r, c.Tags[0]), nil return fmt.Sprintf(manifestAPIPattern, "https://testing.harbor.com", r, c.Tags[0]), nil
@ -123,7 +151,11 @@ func (suite *EnforcerTestSuite) SetupSuite() {
return "fake-token", nil return "fake-token", nil
}, },
} }
}
// TearDownSuite cleans the testing env
func (suite *EnforcerTestSuite) TearDownSuite() {
suite.server.Close()
} }
// TestEnforcePolicy tests the policy enforcement case. // TestEnforcePolicy tests the policy enforcement case.

View File

@ -91,6 +91,23 @@ func (suite *JobTestSuite) TestJobWithDragonflyDriver() {
suite.runJob(ins) suite.runJob(ins)
} }
// TestJobWithKrakenDriver test preheat job running with Kraken driver.
func (suite *JobTestSuite) TestJobWithKrakenDriver() {
ins := &p.Instance{
ID: 2,
Name: "test-instance2",
Vendor: provider.DriverKraken,
Endpoint: suite.kraken.URL,
AuthMode: auth.AuthModeNone,
Enabled: true,
Default: true,
Insecure: true,
Status: provider.DriverStatusHealthy,
}
suite.runJob(ins)
}
func (suite *JobTestSuite) validateJob(j job.Interface, params job.Parameters) { func (suite *JobTestSuite) validateJob(j job.Interface, params job.Parameters) {
require.Equal(suite.T(), uint(1), j.MaxFails(), "max fails") require.Equal(suite.T(), uint(1), j.MaxFails(), "max fails")
require.Equal(suite.T(), false, j.ShouldRetry(), "should retry") require.Equal(suite.T(), false, j.ShouldRetry(), "should retry")

View File

@ -14,5 +14,5 @@ func DragonflyFactory(instance *provider.Instance) (Driver, error) {
// KrakenFactory creates kraken driver // KrakenFactory creates kraken driver
func KrakenFactory(instance *provider.Instance) (Driver, error) { func KrakenFactory(instance *provider.Instance) (Driver, error) {
return &KrakenDriver{instance, nil}, nil return &KrakenDriver{instance}, nil
} }

View File

@ -12,7 +12,6 @@ import (
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider" "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth" "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/client" "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/client"
"github.com/goharbor/harbor/src/pkg/registry"
) )
const ( const (
@ -20,13 +19,10 @@ const (
krakenPreheatPath = "/registry/notifications" krakenPreheatPath = "/registry/notifications"
) )
type digestFetcherFunc func(repoName, tag string) (string, error)
// KrakenDriver implements the provider driver interface for Uber kraken. // KrakenDriver implements the provider driver interface for Uber kraken.
// More details, please refer to https://github.com/uber/kraken // More details, please refer to https://github.com/uber/kraken
type KrakenDriver struct { type KrakenDriver struct {
instance *provider.Instance instance *provider.Instance
digestFetcher digestFetcherFunc
} }
// Self implements @Driver.Self. // Self implements @Driver.Self.
@ -73,20 +69,13 @@ func (kd *KrakenDriver) Preheat(preheatingImage *PreheatImage) (*PreheatingStatu
url := fmt.Sprintf("%s%s", strings.TrimSuffix(kd.instance.Endpoint, "/"), krakenPreheatPath) url := fmt.Sprintf("%s%s", strings.TrimSuffix(kd.instance.Endpoint, "/"), krakenPreheatPath)
var events = make([]cm.Event, 0) var events = make([]cm.Event, 0)
eventID := utils.GenerateRandomString() eventID := utils.GenerateRandomString()
if kd.digestFetcher == nil {
kd.digestFetcher = fetchDigest
}
digest, err := kd.digestFetcher(preheatingImage.ImageName, preheatingImage.Tag)
if err != nil {
return nil, err
}
event := cm.Event{ event := cm.Event{
ID: eventID, ID: eventID,
TimeStamp: time.Now().UTC(), TimeStamp: time.Now().UTC(),
Action: "push", Action: "push",
Target: &cm.Target{ Target: &cm.Target{
MediaType: schema2.MediaTypeManifest, MediaType: schema2.MediaTypeManifest,
Digest: digest, Digest: preheatingImage.Digest,
Repository: preheatingImage.ImageName, Repository: preheatingImage.ImageName,
URL: preheatingImage.URL, URL: preheatingImage.URL,
Tag: preheatingImage.Tag, Tag: preheatingImage.Tag,
@ -96,7 +85,7 @@ func (kd *KrakenDriver) Preheat(preheatingImage *PreheatImage) (*PreheatingStatu
var payload = cm.Notification{ var payload = cm.Notification{
Events: events, Events: events,
} }
_, err = client.GetHTTPClient(kd.instance.Insecure).Post(url, kd.getCred(), payload, nil) _, err := client.GetHTTPClient(kd.instance.Insecure).Post(url, kd.getCred(), payload, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -124,14 +113,3 @@ func (kd *KrakenDriver) getCred() *auth.Credential {
Data: kd.instance.AuthInfo, Data: kd.instance.AuthInfo,
} }
} }
func fetchDigest(repoName, tag string) (string, error) {
exist, digest, err := registry.Cli.ManifestExist(repoName, tag)
if err != nil {
return "", err
}
if !exist {
return "", errors.New("image not found")
}
return digest, nil
}

View File

@ -55,9 +55,6 @@ func (suite *KrakenTestSuite) SetupSuite() {
Insecure: true, Insecure: true,
Status: DriverStatusHealthy, Status: DriverStatusHealthy,
}, },
digestFetcher: func(repoName, tag string) (s string, e error) {
return "image@digest", nil
},
} }
} }
@ -84,6 +81,7 @@ func (suite *KrakenTestSuite) TestPreheat() {
st, err := suite.driver.Preheat(&PreheatImage{ st, err := suite.driver.Preheat(&PreheatImage{
Type: "image", Type: "image",
ImageName: "busybox", ImageName: "busybox",
Digest: "sha256@fake",
Tag: "latest", Tag: "latest",
URL: "https://harbor.com", URL: "https://harbor.com",
}) })

View File

@ -1,21 +1,24 @@
// Code generated by mockery v1.0.0. DO NOT EDIT. // Code generated by mockery v2.0.3. DO NOT EDIT.
package mocks package instance
import ( import (
context "context" context "context"
q "github.com/goharbor/harbor/src/lib/q"
provider "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
provider "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
q "github.com/goharbor/harbor/src/lib/q"
) )
// Manager is an autogenerated mock type for the Manager type // FakeManager is an autogenerated mock type for the Manager type
type Manager struct { type FakeManager struct {
mock.Mock mock.Mock
} }
// Count provides a mock function with given fields: ctx, query // Count provides a mock function with given fields: ctx, query
func (_m *Manager) Count(ctx context.Context, query *q.Query) (int64, error) { func (_m *FakeManager) Count(ctx context.Context, query *q.Query) (int64, error) {
ret := _m.Called(ctx, query) ret := _m.Called(ctx, query)
var r0 int64 var r0 int64
@ -36,7 +39,7 @@ func (_m *Manager) Count(ctx context.Context, query *q.Query) (int64, error) {
} }
// Delete provides a mock function with given fields: ctx, id // Delete provides a mock function with given fields: ctx, id
func (_m *Manager) Delete(ctx context.Context, id int64) error { func (_m *FakeManager) Delete(ctx context.Context, id int64) error {
ret := _m.Called(ctx, id) ret := _m.Called(ctx, id)
var r0 error var r0 error
@ -50,7 +53,7 @@ func (_m *Manager) Delete(ctx context.Context, id int64) error {
} }
// Get provides a mock function with given fields: ctx, id // Get provides a mock function with given fields: ctx, id
func (_m *Manager) Get(ctx context.Context, id int64) (*provider.Instance, error) { func (_m *FakeManager) Get(ctx context.Context, id int64) (*provider.Instance, error) {
ret := _m.Called(ctx, id) ret := _m.Called(ctx, id)
var r0 *provider.Instance var r0 *provider.Instance
@ -73,7 +76,7 @@ func (_m *Manager) Get(ctx context.Context, id int64) (*provider.Instance, error
} }
// List provides a mock function with given fields: ctx, query // List provides a mock function with given fields: ctx, query
func (_m *Manager) List(ctx context.Context, query *q.Query) ([]*provider.Instance, error) { func (_m *FakeManager) List(ctx context.Context, query *q.Query) ([]*provider.Instance, error) {
ret := _m.Called(ctx, query) ret := _m.Called(ctx, query)
var r0 []*provider.Instance var r0 []*provider.Instance
@ -96,7 +99,7 @@ func (_m *Manager) List(ctx context.Context, query *q.Query) ([]*provider.Instan
} }
// Save provides a mock function with given fields: ctx, inst // Save provides a mock function with given fields: ctx, inst
func (_m *Manager) Save(ctx context.Context, inst *provider.Instance) (int64, error) { func (_m *FakeManager) Save(ctx context.Context, inst *provider.Instance) (int64, error) {
ret := _m.Called(ctx, inst) ret := _m.Called(ctx, inst)
var r0 int64 var r0 int64
@ -117,7 +120,7 @@ func (_m *Manager) Save(ctx context.Context, inst *provider.Instance) (int64, er
} }
// Update provides a mock function with given fields: ctx, inst, props // Update provides a mock function with given fields: ctx, inst, props
func (_m *Manager) Update(ctx context.Context, inst *provider.Instance, props ...string) error { func (_m *FakeManager) Update(ctx context.Context, inst *provider.Instance, props ...string) error {
_va := make([]interface{}, len(props)) _va := make([]interface{}, len(props))
for _i := range props { for _i := range props {
_va[_i] = props[_i] _va[_i] = props[_i]