Merge pull request #8624 from wy65701436/quota-migration-dev

Add quota sync api to sync quota data with backend storage
This commit is contained in:
Wang Yan 2019-08-16 17:34:54 +08:00 committed by GitHub
commit 42bb5e1143
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1024 additions and 37 deletions

View File

@ -2409,6 +2409,20 @@ paths:
$ref: '#/responses/UnsupportedMediaType' $ref: '#/responses/UnsupportedMediaType'
'500': '500':
description: Unexpected internal errors. description: Unexpected internal errors.
/internal/syncquota:
post:
summary: Sync quota from registry/chart to DB.
description: |
This endpoint is for syncing quota usage of registry/chart with database.
tags:
- Products
responses:
'200':
description: Sync repositories successfully.
'401':
description: User need to log in first.
'403':
description: User does not have permission of system admin role.
/systeminfo: /systeminfo:
get: get:
summary: Get general system info summary: Get general system info

View File

@ -86,6 +86,7 @@ CREATE TABLE quota_usage
UNIQUE (reference, reference_id) UNIQUE (reference, reference_id)
); );
/* only set quota and usage for 'library', and let the sync quota handling others. */
INSERT INTO quota (reference, reference_id, hard, creation_time, update_time) INSERT INTO quota (reference, reference_id, hard, creation_time, update_time)
SELECT 'project', SELECT 'project',
CAST(project_id AS VARCHAR), CAST(project_id AS VARCHAR),
@ -93,7 +94,7 @@ SELECT 'project',
NOW(), NOW(),
NOW() NOW()
FROM project FROM project
WHERE deleted = 'f'; WHERE name = 'library' and deleted = 'f';
INSERT INTO quota_usage (id, reference, reference_id, used, creation_time, update_time) INSERT INTO quota_usage (id, reference, reference_id, used, creation_time, update_time)
SELECT id, SELECT id,

View File

@ -58,6 +58,7 @@ func UpdateArtifactPullTime(af *models.Artifact) error {
// DeleteArtifact ... // DeleteArtifact ...
func DeleteArtifact(id int64) error { func DeleteArtifact(id int64) error {
_, err := GetOrmer().QueryTable(&models.Artifact{}).Filter("ID", id).Delete() _, err := GetOrmer().QueryTable(&models.Artifact{}).Filter("ID", id).Delete()
return err return err
} }

View File

@ -35,6 +35,7 @@ import (
testutils "github.com/goharbor/harbor/src/common/utils/test" testutils "github.com/goharbor/harbor/src/common/utils/test"
api_models "github.com/goharbor/harbor/src/core/api/models" api_models "github.com/goharbor/harbor/src/core/api/models"
apimodels "github.com/goharbor/harbor/src/core/api/models" apimodels "github.com/goharbor/harbor/src/core/api/models"
quota "github.com/goharbor/harbor/src/core/api/quota"
_ "github.com/goharbor/harbor/src/core/auth/db" _ "github.com/goharbor/harbor/src/core/auth/db"
_ "github.com/goharbor/harbor/src/core/auth/ldap" _ "github.com/goharbor/harbor/src/core/auth/ldap"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
@ -203,12 +204,17 @@ func init() {
beego.Router("/api/quotas/:id([0-9]+)", quotaAPIType, "get:Get;put:Put") beego.Router("/api/quotas/:id([0-9]+)", quotaAPIType, "get:Get;put:Put")
beego.Router("/api/internal/switchquota", &InternalAPI{}, "put:SwitchQuota") beego.Router("/api/internal/switchquota", &InternalAPI{}, "put:SwitchQuota")
beego.Router("/api/internal/syncquota", &InternalAPI{}, "post:SyncQuota")
// syncRegistry // syncRegistry
if err := SyncRegistry(config.GlobalProjectMgr); err != nil { if err := SyncRegistry(config.GlobalProjectMgr); err != nil {
log.Fatalf("failed to sync repositories from registry: %v", err) log.Fatalf("failed to sync repositories from registry: %v", err)
} }
if err := quota.Sync(config.GlobalProjectMgr, false); err != nil {
log.Fatalf("failed to sync quota from backend: %v", err)
}
// Init user Info // Init user Info
admin = &usrInfo{adminName, adminPwd} admin = &usrInfo{adminName, adminPwd}
unknownUsr = &usrInfo{"unknown", "unknown"} unknownUsr = &usrInfo{"unknown", "unknown"}

View File

@ -35,7 +35,8 @@ import (
var ( var (
timeout = 60 * time.Second timeout = 60 * time.Second
healthCheckerRegistry = map[string]health.Checker{} // HealthCheckerRegistry ...
HealthCheckerRegistry = map[string]health.Checker{}
) )
type overallHealthStatus struct { type overallHealthStatus struct {
@ -67,11 +68,11 @@ type HealthAPI struct {
func (h *HealthAPI) CheckHealth() { func (h *HealthAPI) CheckHealth() {
var isHealthy healthy = true var isHealthy healthy = true
components := []*componentHealthStatus{} components := []*componentHealthStatus{}
c := make(chan *componentHealthStatus, len(healthCheckerRegistry)) c := make(chan *componentHealthStatus, len(HealthCheckerRegistry))
for name, checker := range healthCheckerRegistry { for name, checker := range HealthCheckerRegistry {
go check(name, checker, timeout, c) go check(name, checker, timeout, c)
} }
for i := 0; i < len(healthCheckerRegistry); i++ { for i := 0; i < len(HealthCheckerRegistry); i++ {
componentStatus := <-c componentStatus := <-c
if len(componentStatus.Error) != 0 { if len(componentStatus.Error) != 0 {
isHealthy = false isHealthy = false
@ -290,21 +291,21 @@ func redisHealthChecker() health.Checker {
} }
func registerHealthCheckers() { func registerHealthCheckers() {
healthCheckerRegistry["core"] = coreHealthChecker() HealthCheckerRegistry["core"] = coreHealthChecker()
healthCheckerRegistry["portal"] = portalHealthChecker() HealthCheckerRegistry["portal"] = portalHealthChecker()
healthCheckerRegistry["jobservice"] = jobserviceHealthChecker() HealthCheckerRegistry["jobservice"] = jobserviceHealthChecker()
healthCheckerRegistry["registry"] = registryHealthChecker() HealthCheckerRegistry["registry"] = registryHealthChecker()
healthCheckerRegistry["registryctl"] = registryCtlHealthChecker() HealthCheckerRegistry["registryctl"] = registryCtlHealthChecker()
healthCheckerRegistry["database"] = databaseHealthChecker() HealthCheckerRegistry["database"] = databaseHealthChecker()
healthCheckerRegistry["redis"] = redisHealthChecker() HealthCheckerRegistry["redis"] = redisHealthChecker()
if config.WithChartMuseum() { if config.WithChartMuseum() {
healthCheckerRegistry["chartmuseum"] = chartmuseumHealthChecker() HealthCheckerRegistry["chartmuseum"] = chartmuseumHealthChecker()
} }
if config.WithClair() { if config.WithClair() {
healthCheckerRegistry["clair"] = clairHealthChecker() HealthCheckerRegistry["clair"] = clairHealthChecker()
} }
if config.WithNotary() { if config.WithNotary() {
healthCheckerRegistry["notary"] = notaryHealthChecker() HealthCheckerRegistry["notary"] = notaryHealthChecker()
} }
} }

View File

@ -92,9 +92,9 @@ func fakeHealthChecker(healthy bool) health.Checker {
} }
func TestCheckHealth(t *testing.T) { func TestCheckHealth(t *testing.T) {
// component01: healthy, component02: healthy => status: healthy // component01: healthy, component02: healthy => status: healthy
healthCheckerRegistry = map[string]health.Checker{} HealthCheckerRegistry = map[string]health.Checker{}
healthCheckerRegistry["component01"] = fakeHealthChecker(true) HealthCheckerRegistry["component01"] = fakeHealthChecker(true)
healthCheckerRegistry["component02"] = fakeHealthChecker(true) HealthCheckerRegistry["component02"] = fakeHealthChecker(true)
status := map[string]interface{}{} status := map[string]interface{}{}
err := handleAndParse(&testingRequest{ err := handleAndParse(&testingRequest{
method: http.MethodGet, method: http.MethodGet,
@ -104,9 +104,9 @@ func TestCheckHealth(t *testing.T) {
assert.Equal(t, "healthy", status["status"].(string)) assert.Equal(t, "healthy", status["status"].(string))
// component01: healthy, component02: unhealthy => status: unhealthy // component01: healthy, component02: unhealthy => status: unhealthy
healthCheckerRegistry = map[string]health.Checker{} HealthCheckerRegistry = map[string]health.Checker{}
healthCheckerRegistry["component01"] = fakeHealthChecker(true) HealthCheckerRegistry["component01"] = fakeHealthChecker(true)
healthCheckerRegistry["component02"] = fakeHealthChecker(false) HealthCheckerRegistry["component02"] = fakeHealthChecker(false)
status = map[string]interface{}{} status = map[string]interface{}{}
err = handleAndParse(&testingRequest{ err = handleAndParse(&testingRequest{
method: http.MethodGet, method: http.MethodGet,
@ -128,7 +128,7 @@ func TestDatabaseHealthChecker(t *testing.T) {
} }
func TestRegisterHealthCheckers(t *testing.T) { func TestRegisterHealthCheckers(t *testing.T) {
healthCheckerRegistry = map[string]health.Checker{} HealthCheckerRegistry = map[string]health.Checker{}
registerHealthCheckers() registerHealthCheckers()
assert.NotNil(t, healthCheckerRegistry["core"]) assert.NotNil(t, HealthCheckerRegistry["core"])
} }

View File

@ -19,12 +19,17 @@ import (
"github.com/goharbor/harbor/src/common" "github.com/goharbor/harbor/src/common"
"github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/quota" common_quota "github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/pkg/errors" "github.com/pkg/errors"
"strconv" "strconv"
quota "github.com/goharbor/harbor/src/core/api/quota"
comcfg "github.com/goharbor/harbor/src/common/config"
) )
// InternalAPI handles request of harbor admin... // InternalAPI handles request of harbor admin...
@ -132,14 +137,14 @@ func (ia *InternalAPI) ensureQuota() error {
pCount = pCount + int64(count) pCount = pCount + int64(count)
} }
quotaMgr, err := quota.NewManager("project", strconv.FormatInt(project.ProjectID, 10)) quotaMgr, err := common_quota.NewManager("project", strconv.FormatInt(project.ProjectID, 10))
if err != nil { if err != nil {
logger.Errorf("Error occurred when to new quota manager %v, just skip it.", err) logger.Errorf("Error occurred when to new quota manager %v, just skip it.", err)
continue continue
} }
used := quota.ResourceList{ used := common_quota.ResourceList{
quota.ResourceStorage: pSize, common_quota.ResourceStorage: pSize,
quota.ResourceCount: pCount, common_quota.ResourceCount: pCount,
} }
if err := quotaMgr.EnsureQuota(used); err != nil { if err := quotaMgr.EnsureQuota(used); err != nil {
logger.Errorf("cannot ensure quota for the project: %d, err: %v, just skip it.", project.ProjectID, err) logger.Errorf("cannot ensure quota for the project: %d, err: %v, just skip it.", project.ProjectID, err)
@ -148,3 +153,28 @@ func (ia *InternalAPI) ensureQuota() error {
} }
return nil return nil
} }
// SyncQuota ...
func (ia *InternalAPI) SyncQuota() {
cur := config.ReadOnly()
cfgMgr := comcfg.NewDBCfgManager()
if cur != true {
cfgMgr.Set(common.ReadOnly, true)
}
// For api call, to avoid the timeout, it should be asynchronous
go func() {
defer func() {
if cur != true {
cfgMgr.Set(common.ReadOnly, false)
}
}()
log.Info("start to sync quota(API), the system will be set to ReadOnly and back it normal once it done.")
err := quota.Sync(ia.ProjectMgr, false)
if err != nil {
log.Errorf("fail to sync quota(API), but with error: %v, please try to do it again.", err)
return
}
log.Info("success to sync quota(API).")
}()
return
}

View File

@ -54,3 +54,36 @@ func TestSwitchQuota(t *testing.T) {
} }
runCodeCheckingCases(t, cases...) runCodeCheckingCases(t, cases...)
} }
// cannot verify the real scenario here
func TestSyncQuota(t *testing.T) {
cases := []*codeCheckingCase{
// 401
{
request: &testingRequest{
method: http.MethodPost,
url: "/api/internal/syncquota",
},
code: http.StatusUnauthorized,
},
// 200
{
request: &testingRequest{
method: http.MethodPost,
url: "/api/internal/syncquota",
credential: sysAdmin,
},
code: http.StatusOK,
},
// 403
{
request: &testingRequest{
url: "/api/internal/syncquota",
method: http.MethodPost,
credential: nonSysAdmin,
},
code: http.StatusForbidden,
},
}
runCodeCheckingCases(t, cases...)
}

View File

@ -0,0 +1,226 @@
// Copyright 2018 Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chart
import (
"fmt"
"github.com/goharbor/harbor/src/chartserver"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
common_quota "github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/api"
quota "github.com/goharbor/harbor/src/core/api/quota"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/promgr"
"github.com/pkg/errors"
"net/url"
"strings"
"sync"
)
// Migrator ...
type Migrator struct {
pm promgr.ProjectManager
}
// NewChartMigrator returns a new RegistryMigrator.
func NewChartMigrator(pm promgr.ProjectManager) quota.QuotaMigrator {
migrator := Migrator{
pm: pm,
}
return &migrator
}
var (
controller *chartserver.Controller
controllerErr error
controllerOnce sync.Once
)
// Ping ...
func (rm *Migrator) Ping() error {
return api.HealthCheckerRegistry["chartmuseum"].Check()
}
// Dump ...
// Depends on DB to dump chart data, as chart cannot get all of namespaces.
func (rm *Migrator) Dump() ([]quota.ProjectInfo, error) {
var (
projects []quota.ProjectInfo
wg sync.WaitGroup
err error
)
all, err := dao.GetProjects(nil)
if err != nil {
return nil, err
}
wg.Add(len(all))
errChan := make(chan error, 1)
infoChan := make(chan interface{})
done := make(chan bool, 1)
go func() {
defer func() {
done <- true
}()
for {
select {
case result := <-infoChan:
if result == nil {
return
}
project, ok := result.(quota.ProjectInfo)
if ok {
projects = append(projects, project)
}
case e := <-errChan:
if err == nil {
err = errors.Wrap(e, "quota sync error on getting info of project")
} else {
err = errors.Wrap(e, err.Error())
}
}
}
}()
for _, project := range all {
go func(project *models.Project) {
defer wg.Done()
var repos []quota.RepoData
ctr, err := chartController()
if err != nil {
errChan <- err
return
}
chartInfo, err := ctr.ListCharts(project.Name)
if err != nil {
errChan <- err
return
}
// repo
for _, chart := range chartInfo {
var afs []*models.Artifact
chartVersions, err := ctr.GetChart(project.Name, chart.Name)
if err != nil {
errChan <- err
continue
}
for _, chart := range chartVersions {
af := &models.Artifact{
PID: project.ProjectID,
Repo: chart.Name,
Tag: chart.Version,
Digest: chart.Digest,
Kind: "Chart",
}
afs = append(afs, af)
}
repoData := quota.RepoData{
Name: project.Name,
Afs: afs,
}
repos = append(repos, repoData)
}
projectInfo := quota.ProjectInfo{
Name: project.Name,
Repos: repos,
}
infoChan <- projectInfo
}(project)
}
wg.Wait()
close(infoChan)
<-done
if err != nil {
return nil, err
}
return projects, nil
}
// Usage ...
// Chart will not cover size.
func (rm *Migrator) Usage(projects []quota.ProjectInfo) ([]quota.ProjectUsage, error) {
var pros []quota.ProjectUsage
for _, project := range projects {
var count int64
// usage count
for _, repo := range project.Repos {
count = count + int64(len(repo.Afs))
}
proUsage := quota.ProjectUsage{
Project: project.Name,
Used: common_quota.ResourceList{
common_quota.ResourceCount: count,
common_quota.ResourceStorage: 0,
},
}
pros = append(pros, proUsage)
}
return pros, nil
}
// Persist ...
// Chart will not persist data into db.
func (rm *Migrator) Persist(projects []quota.ProjectInfo) error {
return nil
}
func chartController() (*chartserver.Controller, error) {
controllerOnce.Do(func() {
addr, err := config.GetChartMuseumEndpoint()
if err != nil {
controllerErr = fmt.Errorf("failed to get the endpoint URL of chart storage server: %s", err.Error())
return
}
addr = strings.TrimSuffix(addr, "/")
url, err := url.Parse(addr)
if err != nil {
controllerErr = errors.New("endpoint URL of chart storage server is malformed")
return
}
ctr, err := chartserver.NewController(url)
if err != nil {
controllerErr = errors.New("failed to initialize chart API controller")
}
controller = ctr
log.Debugf("Chart storage server is set to %s", url.String())
log.Info("API controller for chart repository server is successfully initialized")
})
return controller, controllerErr
}
func init() {
quota.Register("chart", NewChartMigrator)
}

View File

@ -0,0 +1,173 @@
// Copyright 2018 Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/promgr"
"github.com/goharbor/harbor/src/pkg/types"
"strconv"
)
// QuotaMigrator ...
type QuotaMigrator interface {
// Ping validates and wait for backend service ready.
Ping() error
// Dump exports all data from backend service, registry, chartmuseum
Dump() ([]ProjectInfo, error)
// Usage computes the quota usage of all the projects
Usage([]ProjectInfo) ([]ProjectUsage, error)
// Persist record the data to DB, artifact, artifact_blob and blob tabel.
Persist([]ProjectInfo) error
}
// ProjectInfo ...
type ProjectInfo struct {
Name string
Repos []RepoData
}
// RepoData ...
type RepoData struct {
Name string
Afs []*models.Artifact
Afnbs []*models.ArtifactAndBlob
Blobs []*models.Blob
}
// ProjectUsage ...
type ProjectUsage struct {
Project string
Used quota.ResourceList
}
// Instance ...
type Instance func(promgr.ProjectManager) QuotaMigrator
var adapters = make(map[string]Instance)
// Register ...
func Register(name string, adapter Instance) {
if adapter == nil {
panic("quota: Register adapter is nil")
}
if _, ok := adapters[name]; ok {
panic("quota: Register called twice for adapter " + name)
}
adapters[name] = adapter
}
// Sync ...
func Sync(pm promgr.ProjectManager, populate bool) error {
totalUsage := make(map[string][]ProjectUsage)
for name, instanceFunc := range adapters {
if !config.WithChartMuseum() {
if name == "chart" {
continue
}
}
adapter := instanceFunc(pm)
if err := adapter.Ping(); err != nil {
return err
}
data, err := adapter.Dump()
if err != nil {
return err
}
usage, err := adapter.Usage(data)
if err != nil {
return err
}
totalUsage[name] = usage
if populate {
if err := adapter.Persist(data); err != nil {
return err
}
}
}
merged := mergeUsage(totalUsage)
if err := ensureQuota(merged); err != nil {
return err
}
return nil
}
// mergeUsage merges the usage of adapters
func mergeUsage(total map[string][]ProjectUsage) []ProjectUsage {
if !config.WithChartMuseum() {
return total["registry"]
}
regUsgs := total["registry"]
chartUsgs := total["chart"]
var mergedUsage []ProjectUsage
temp := make(map[string]quota.ResourceList)
for _, regUsg := range regUsgs {
_, exist := temp[regUsg.Project]
if !exist {
temp[regUsg.Project] = regUsg.Used
mergedUsage = append(mergedUsage, ProjectUsage{
Project: regUsg.Project,
Used: regUsg.Used,
})
}
}
for _, chartUsg := range chartUsgs {
var usedTemp quota.ResourceList
_, exist := temp[chartUsg.Project]
if !exist {
usedTemp = chartUsg.Used
} else {
usedTemp = types.Add(temp[chartUsg.Project], chartUsg.Used)
}
temp[chartUsg.Project] = usedTemp
mergedUsage = append(mergedUsage, ProjectUsage{
Project: chartUsg.Project,
Used: usedTemp,
})
}
return mergedUsage
}
// ensureQuota updates the quota and quota usage in the data base.
func ensureQuota(usages []ProjectUsage) error {
var pid int64
for _, usage := range usages {
project, err := dao.GetProjectByName(usage.Project)
if err != nil {
log.Error(err)
return err
}
pid = project.ProjectID
quotaMgr, err := quota.NewManager("project", strconv.FormatInt(pid, 10))
if err != nil {
log.Errorf("Error occurred when to new quota manager %v", err)
return err
}
if err := quotaMgr.EnsureQuota(usage.Used); err != nil {
log.Errorf("cannot ensure quota for the project: %d, err: %v", pid, err)
return err
}
}
return nil
}

View File

@ -0,0 +1,433 @@
// Copyright 2018 Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry
import (
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
common_quota "github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/core/api"
quota "github.com/goharbor/harbor/src/core/api/quota"
"github.com/goharbor/harbor/src/core/promgr"
coreutils "github.com/goharbor/harbor/src/core/utils"
"github.com/pkg/errors"
"strings"
"sync"
"time"
)
// Migrator ...
type Migrator struct {
pm promgr.ProjectManager
}
// NewRegistryMigrator returns a new Migrator.
func NewRegistryMigrator(pm promgr.ProjectManager) quota.QuotaMigrator {
migrator := Migrator{
pm: pm,
}
return &migrator
}
// Ping ...
func (rm *Migrator) Ping() error {
return api.HealthCheckerRegistry["registry"].Check()
}
// Dump ...
func (rm *Migrator) Dump() ([]quota.ProjectInfo, error) {
var (
projects []quota.ProjectInfo
wg sync.WaitGroup
err error
)
reposInRegistry, err := api.Catalog()
if err != nil {
return nil, err
}
// repoMap : map[project_name : []repo list]
repoMap := make(map[string][]string)
for _, item := range reposInRegistry {
projectName := strings.Split(item, "/")[0]
pro, err := rm.pm.Get(projectName)
if err != nil {
log.Errorf("failed to get project %s: %v", projectName, err)
continue
}
_, exist := repoMap[pro.Name]
if !exist {
repoMap[pro.Name] = []string{item}
} else {
repos := repoMap[pro.Name]
repos = append(repos, item)
repoMap[pro.Name] = repos
}
}
wg.Add(len(repoMap))
errChan := make(chan error, 1)
infoChan := make(chan interface{})
done := make(chan bool, 1)
go func() {
defer func() {
done <- true
}()
for {
select {
case result := <-infoChan:
if result == nil {
return
}
project, ok := result.(quota.ProjectInfo)
if ok {
projects = append(projects, project)
}
case e := <-errChan:
if err == nil {
err = errors.Wrap(e, "quota sync error on getting info of project")
} else {
err = errors.Wrap(e, err.Error())
}
}
}
}()
for project, repos := range repoMap {
go func(project string, repos []string) {
defer wg.Done()
info, err := infoOfProject(project, repos)
if err != nil {
errChan <- err
return
}
infoChan <- info
}(project, repos)
}
wg.Wait()
close(infoChan)
// wait for all of project info
<-done
if err != nil {
return nil, err
}
return projects, nil
}
// Usage ...
// registry needs to merge the shard blobs of different repositories.
func (rm *Migrator) Usage(projects []quota.ProjectInfo) ([]quota.ProjectUsage, error) {
var pros []quota.ProjectUsage
for _, project := range projects {
var size, count int64
var blobs = make(map[string]int64)
// usage count
for _, repo := range project.Repos {
count = count + int64(len(repo.Afs))
// Because that there are some shared blobs between repositories, it needs to remove the duplicate items.
for _, blob := range repo.Blobs {
_, exist := blobs[blob.Digest]
if !exist {
blobs[blob.Digest] = blob.Size
}
}
}
// size
for _, item := range blobs {
size = size + item
}
proUsage := quota.ProjectUsage{
Project: project.Name,
Used: common_quota.ResourceList{
common_quota.ResourceCount: count,
common_quota.ResourceStorage: size,
},
}
pros = append(pros, proUsage)
}
return pros, nil
}
// Persist ...
func (rm *Migrator) Persist(projects []quota.ProjectInfo) error {
for _, project := range projects {
for _, repo := range project.Repos {
if err := persistAf(repo.Afs); err != nil {
return err
}
if err := persistAfnbs(repo.Afnbs); err != nil {
return err
}
if err := persistBlob(repo.Blobs); err != nil {
return err
}
}
}
if err := persistPB(projects); err != nil {
return err
}
return nil
}
func persistAf(afs []*models.Artifact) error {
if len(afs) != 0 {
for _, af := range afs {
_, err := dao.AddArtifact(af)
if err != nil {
if err == dao.ErrDupRows {
continue
}
log.Error(err)
return err
}
}
}
return nil
}
func persistAfnbs(afnbs []*models.ArtifactAndBlob) error {
if len(afnbs) != 0 {
for _, afnb := range afnbs {
_, err := dao.AddArtifactNBlob(afnb)
if err != nil {
if err == dao.ErrDupRows {
continue
}
log.Error(err)
return err
}
}
}
return nil
}
func persistBlob(blobs []*models.Blob) error {
if len(blobs) != 0 {
for _, blob := range blobs {
_, err := dao.AddBlob(blob)
if err != nil {
if err == dao.ErrDupRows {
continue
}
log.Error(err)
return err
}
}
}
return nil
}
func persistPB(projects []quota.ProjectInfo) error {
for _, project := range projects {
var blobs = make(map[string]int64)
var blobsOfPro []*models.Blob
for _, repo := range project.Repos {
for _, blob := range repo.Blobs {
_, exist := blobs[blob.Digest]
if exist {
continue
}
blobs[blob.Digest] = blob.Size
blobInDB, err := dao.GetBlob(blob.Digest)
if err != nil {
log.Error(err)
return err
}
if blobInDB != nil {
blobsOfPro = append(blobsOfPro, blobInDB)
}
}
}
pro, err := dao.GetProjectByName(project.Name)
if err != nil {
log.Error(err)
return err
}
_, err = dao.AddBlobsToProject(pro.ProjectID, blobsOfPro...)
if err != nil {
log.Error(err)
return err
}
}
return nil
}
func infoOfProject(project string, repoList []string) (quota.ProjectInfo, error) {
var (
repos []quota.RepoData
wg sync.WaitGroup
err error
)
wg.Add(len(repoList))
errChan := make(chan error, 1)
infoChan := make(chan interface{})
done := make(chan bool, 1)
pro, err := dao.GetProjectByName(project)
if err != nil {
log.Error(err)
return quota.ProjectInfo{}, err
}
go func() {
defer func() {
done <- true
}()
for {
select {
case result := <-infoChan:
if result == nil {
return
}
repoData, ok := result.(quota.RepoData)
if ok {
repos = append(repos, repoData)
}
case e := <-errChan:
if err == nil {
err = errors.Wrap(e, "quota sync error on getting info of repo")
} else {
err = errors.Wrap(e, err.Error())
}
}
}
}()
for _, repo := range repoList {
go func(pid int64, repo string) {
defer func() {
wg.Done()
}()
info, err := infoOfRepo(pid, repo)
if err != nil {
errChan <- err
return
}
infoChan <- info
}(pro.ProjectID, repo)
}
wg.Wait()
close(infoChan)
<-done
if err != nil {
return quota.ProjectInfo{}, err
}
return quota.ProjectInfo{
Name: project,
Repos: repos,
}, nil
}
func infoOfRepo(pid int64, repo string) (quota.RepoData, error) {
repoClient, err := coreutils.NewRepositoryClientForUI("harbor-core", repo)
if err != nil {
return quota.RepoData{}, err
}
tags, err := repoClient.ListTag()
if err != nil {
return quota.RepoData{}, err
}
var afnbs []*models.ArtifactAndBlob
var afs []*models.Artifact
var blobs []*models.Blob
for _, tag := range tags {
_, mediaType, payload, err := repoClient.PullManifest(tag, []string{
schema1.MediaTypeManifest,
schema1.MediaTypeSignedManifest,
schema2.MediaTypeManifest,
})
if err != nil {
log.Error(err)
return quota.RepoData{}, err
}
manifest, desc, err := registry.UnMarshal(mediaType, payload)
if err != nil {
log.Error(err)
return quota.RepoData{}, err
}
// self
afnb := &models.ArtifactAndBlob{
DigestAF: desc.Digest.String(),
DigestBlob: desc.Digest.String(),
}
afnbs = append(afnbs, afnb)
// add manifest as a blob.
blob := &models.Blob{
Digest: desc.Digest.String(),
ContentType: desc.MediaType,
Size: desc.Size,
CreationTime: time.Now(),
}
blobs = append(blobs, blob)
for _, layer := range manifest.References() {
afnb := &models.ArtifactAndBlob{
DigestAF: desc.Digest.String(),
DigestBlob: layer.Digest.String(),
}
afnbs = append(afnbs, afnb)
blob := &models.Blob{
Digest: layer.Digest.String(),
ContentType: layer.MediaType,
Size: layer.Size,
CreationTime: time.Now(),
}
blobs = append(blobs, blob)
}
af := &models.Artifact{
PID: pid,
Repo: strings.Split(repo, "/")[1],
Tag: tag,
Digest: desc.Digest.String(),
Kind: "Docker-Image",
CreationTime: time.Now(),
}
afs = append(afs, af)
}
return quota.RepoData{
Name: repo,
Afs: afs,
Afnbs: afnbs,
Blobs: blobs,
}, nil
}
func init() {
quota.Register("registry", NewRegistryMigrator)
}

View File

@ -38,7 +38,7 @@ func SyncRegistry(pm promgr.ProjectManager) error {
log.Infof("Start syncing repositories from registry to DB... ") log.Infof("Start syncing repositories from registry to DB... ")
reposInRegistry, err := catalog() reposInRegistry, err := Catalog()
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return err return err
@ -105,7 +105,8 @@ func SyncRegistry(pm promgr.ProjectManager) error {
return nil return nil
} }
func catalog() ([]string, error) { // Catalog ...
func Catalog() ([]string, error) {
repositories := []string{} repositories := []string{}
rc, err := initRegistryClient() rc, err := initRegistryClient()

View File

@ -17,16 +17,12 @@ package main
import ( import (
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"os"
"os/signal"
"strconv"
"syscall"
"github.com/astaxie/beego" "github.com/astaxie/beego"
_ "github.com/astaxie/beego/session/redis" _ "github.com/astaxie/beego/session/redis"
"github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/models"
common_quota "github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/api" "github.com/goharbor/harbor/src/core/api"
@ -34,6 +30,15 @@ import (
_ "github.com/goharbor/harbor/src/core/auth/db" _ "github.com/goharbor/harbor/src/core/auth/db"
_ "github.com/goharbor/harbor/src/core/auth/ldap" _ "github.com/goharbor/harbor/src/core/auth/ldap"
_ "github.com/goharbor/harbor/src/core/auth/uaa" _ "github.com/goharbor/harbor/src/core/auth/uaa"
"os"
"os/signal"
"strconv"
"syscall"
quota "github.com/goharbor/harbor/src/core/api/quota"
_ "github.com/goharbor/harbor/src/core/api/quota/chart"
_ "github.com/goharbor/harbor/src/core/api/quota/registry"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/filter" "github.com/goharbor/harbor/src/core/filter"
"github.com/goharbor/harbor/src/core/middlewares" "github.com/goharbor/harbor/src/core/middlewares"
@ -41,6 +46,7 @@ import (
"github.com/goharbor/harbor/src/core/service/token" "github.com/goharbor/harbor/src/core/service/token"
"github.com/goharbor/harbor/src/pkg/notification" "github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/scheduler" "github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/types"
"github.com/goharbor/harbor/src/replication" "github.com/goharbor/harbor/src/replication"
) )
@ -74,6 +80,64 @@ func updateInitPassword(userID int, password string) error {
return nil return nil
} }
// Quota migration
func quotaSync() error {
usages, err := dao.ListQuotaUsages()
if err != nil {
log.Errorf("list quota usage error, %v", err)
return err
}
projects, err := dao.GetProjects(nil)
if err != nil {
log.Errorf("list project error, %v", err)
return err
}
// The condition handles these two cases:
// 1, len(project) > 1 && len(usages) == 1. existing projects without usage, as we do always has 'library' usage in DB.
// 2, migration fails at the phase of inserting usage into DB, and parts of them are inserted successfully.
if len(projects) != len(usages) {
log.Info("Start to sync quota data .....")
if err := quota.Sync(config.GlobalProjectMgr, true); err != nil {
log.Errorf("Fail to sync quota data, %v", err)
return err
}
log.Info("Success to sync quota data .....")
return nil
}
// Only has one project without usage
zero := common_quota.ResourceList{
common_quota.ResourceCount: 0,
common_quota.ResourceStorage: 0,
}
if len(projects) == 1 && len(usages) == 1 {
totalRepo, err := dao.GetTotalOfRepositories()
if totalRepo == 0 {
return nil
}
refID, err := strconv.ParseInt(usages[0].ReferenceID, 10, 64)
if err != nil {
log.Error(err)
return err
}
usedRes, err := types.NewResourceList(usages[0].Used)
if err != nil {
log.Error(err)
return err
}
if types.Equals(usedRes, zero) && refID == projects[0].ProjectID {
log.Info("Start to sync quota data .....")
if err := quota.Sync(config.GlobalProjectMgr, true); err != nil {
log.Errorf("Fail to sync quota data, %v", err)
return err
}
log.Info("Success to sync quota data .....")
}
}
return nil
}
func gracefulShutdown(closing chan struct{}) { func gracefulShutdown(closing chan struct{}) {
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
@ -174,6 +238,9 @@ func main() {
log.Fatalf("init proxy error, %v", err) log.Fatalf("init proxy error, %v", err)
} }
// go proxy.StartProxy() if err := quotaSync(); err != nil {
log.Fatalf("quota migration error, %v", err)
}
beego.Run() beego.Run()
} }

View File

@ -135,6 +135,7 @@ func initRouters() {
beego.Router("/api/internal/syncregistry", &api.InternalAPI{}, "post:SyncRegistry") beego.Router("/api/internal/syncregistry", &api.InternalAPI{}, "post:SyncRegistry")
beego.Router("/api/internal/renameadmin", &api.InternalAPI{}, "post:RenameAdmin") beego.Router("/api/internal/renameadmin", &api.InternalAPI{}, "post:RenameAdmin")
beego.Router("/api/internal/switchquota", &api.InternalAPI{}, "put:SwitchQuota") beego.Router("/api/internal/switchquota", &api.InternalAPI{}, "put:SwitchQuota")
beego.Router("/api/internal/syncquota", &api.InternalAPI{}, "post:SyncQuota")
// external service that hosted on harbor process: // external service that hosted on harbor process:
beego.Router("/service/notifications", &registry.NotificationHandler{}) beego.Router("/service/notifications", &registry.NotificationHandler{})