diff --git a/src/common/dao/artifact.go b/src/common/dao/artifact.go index bac77d74b..34663b5cd 100644 --- a/src/common/dao/artifact.go +++ b/src/common/dao/artifact.go @@ -58,6 +58,7 @@ func UpdateArtifactPullTime(af *models.Artifact) error { // DeleteArtifact ... func DeleteArtifact(id int64) error { + _, err := GetOrmer().QueryTable(&models.Artifact{}).Filter("ID", id).Delete() return err } diff --git a/src/core/api/internal.go b/src/core/api/internal.go index 08fa377ed..5e4b621dc 100644 --- a/src/core/api/internal.go +++ b/src/core/api/internal.go @@ -19,12 +19,15 @@ import ( "github.com/goharbor/harbor/src/common" "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/models" - "github.com/goharbor/harbor/src/common/quota" + common_quota "github.com/goharbor/harbor/src/common/quota" "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/pkg/errors" "strconv" + + quota "github.com/goharbor/harbor/src/core/api/quota" ) // InternalAPI handles request of harbor admin... @@ -132,14 +135,14 @@ func (ia *InternalAPI) ensureQuota() error { pCount = pCount + int64(count) } - quotaMgr, err := quota.NewManager("project", strconv.FormatInt(project.ProjectID, 10)) + quotaMgr, err := common_quota.NewManager("project", strconv.FormatInt(project.ProjectID, 10)) if err != nil { logger.Errorf("Error occurred when to new quota manager %v, just skip it.", err) continue } - used := quota.ResourceList{ - quota.ResourceStorage: pSize, - quota.ResourceCount: pCount, + used := common_quota.ResourceList{ + common_quota.ResourceStorage: pSize, + common_quota.ResourceCount: pCount, } if err := quotaMgr.EnsureQuota(used); err != nil { logger.Errorf("cannot ensure quota for the project: %d, err: %v, just skip it.", project.ProjectID, err) @@ -148,3 +151,12 @@ func (ia *InternalAPI) ensureQuota() error { } return nil } + +// SyncQuota ... +func (ia *InternalAPI) SyncQuota() { + err := quota.Sync(ia.ProjectMgr, false) + if err != nil { + ia.SendInternalServerError(err) + return + } +} diff --git a/src/core/api/quota/chart/chart.go b/src/core/api/quota/chart/chart.go new file mode 100644 index 000000000..438fc6beb --- /dev/null +++ b/src/core/api/quota/chart/chart.go @@ -0,0 +1,217 @@ +// Copyright 2018 Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chart + +import ( + "fmt" + "github.com/goharbor/harbor/src/chartserver" + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/models" + common_quota "github.com/goharbor/harbor/src/common/quota" + "github.com/goharbor/harbor/src/common/utils/log" + quota "github.com/goharbor/harbor/src/core/api/quota" + "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/core/promgr" + "github.com/pkg/errors" + "net/url" + "strings" + "sync" +) + +// Migrator ... +type Migrator struct { + pm promgr.ProjectManager +} + +// NewChartMigrator returns a new RegistryMigrator. +func NewChartMigrator(pm promgr.ProjectManager) quota.QuotaMigrator { + migrator := Migrator{ + pm: pm, + } + return &migrator +} + +var ( + controller *chartserver.Controller + controllerErr error + controllerOnce sync.Once +) + +// Dump ... +// Depends on DB to dump chart data, as chart cannot get all of namespaces. +func (rm *Migrator) Dump() ([]quota.ProjectInfo, error) { + var ( + projects []quota.ProjectInfo + wg sync.WaitGroup + err error + ) + + all, err := dao.GetProjects(nil) + if err != nil { + return nil, err + } + + wg.Add(len(all)) + errChan := make(chan error, 1) + infoChan := make(chan interface{}) + done := make(chan bool, 1) + + go func() { + defer func() { + done <- true + }() + + for { + select { + case result := <-infoChan: + if result == nil { + return + } + project, ok := result.(quota.ProjectInfo) + if ok { + projects = append(projects, project) + } + + case e := <-errChan: + if err == nil { + err = errors.Wrap(e, "quota sync error on getting info of project") + } else { + err = errors.Wrap(e, err.Error()) + } + } + } + }() + + for _, project := range all { + go func(project *models.Project) { + defer wg.Done() + + var repos []quota.RepoData + var afs []*models.Artifact + + ctr, err := chartController() + if err != nil { + log.Error(err) + } + + chartInfo, err := ctr.ListCharts(project.Name) + if err != nil { + log.Error(err) + } + + // repo + for _, chart := range chartInfo { + chartVersions, err := ctr.GetChart(project.Name, chart.Name) + if err != nil { + log.Error(err) + } + for _, chart := range chartVersions { + af := &models.Artifact{ + PID: project.ProjectID, + Repo: chart.Name, + Tag: chart.Version, + Digest: chart.Digest, + Kind: "Chart", + } + afs = append(afs, af) + } + repoData := quota.RepoData{ + Name: project.Name, + Afs: afs, + } + repos = append(repos, repoData) + } + + projectInfo := quota.ProjectInfo{ + Name: project.Name, + Repos: repos, + } + + infoChan <- projectInfo + }(project) + } + + wg.Done() + close(infoChan) + + <-done + + if err != nil { + return nil, err + } + + return projects, nil +} + +// Usage ... +// Chart will not cover size. +func (rm *Migrator) Usage(projects []quota.ProjectInfo) ([]quota.ProjectUsage, error) { + var pros []quota.ProjectUsage + for _, project := range projects { + var count int64 + // usage count + for _, repo := range project.Repos { + count = count + int64(len(repo.Afs)) + } + proUsage := quota.ProjectUsage{ + Project: project.Name, + Used: common_quota.ResourceList{ + common_quota.ResourceCount: count, + }, + } + pros = append(pros, proUsage) + } + return pros, nil + +} + +// Persist ... +// Chart will not persist data into db. +func (rm *Migrator) Persist(projects []quota.ProjectInfo) error { + return nil +} + +func chartController() (*chartserver.Controller, error) { + controllerOnce.Do(func() { + addr, err := config.GetChartMuseumEndpoint() + if err != nil { + controllerErr = fmt.Errorf("failed to get the endpoint URL of chart storage server: %s", err.Error()) + return + } + + addr = strings.TrimSuffix(addr, "/") + url, err := url.Parse(addr) + if err != nil { + controllerErr = errors.New("endpoint URL of chart storage server is malformed") + return + } + + ctr, err := chartserver.NewController(url) + if err != nil { + controllerErr = errors.New("failed to initialize chart API controller") + } + + controller = ctr + + log.Debugf("Chart storage server is set to %s", url.String()) + log.Info("API controller for chart repository server is successfully initialized") + }) + + return controller, controllerErr +} + +func init() { + quota.Register("chart", NewChartMigrator) +} diff --git a/src/core/api/quota/migrator.go b/src/core/api/quota/migrator.go new file mode 100644 index 000000000..dfab9ce41 --- /dev/null +++ b/src/core/api/quota/migrator.go @@ -0,0 +1,128 @@ +// Copyright 2018 Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package models + +import ( + "fmt" + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/common/quota" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/core/promgr" + "strconv" +) + +// QuotaMigrator ... +type QuotaMigrator interface { + // Dump exports all data from backend service, registry, chartmuseum + Dump() ([]ProjectInfo, error) + + // Usage computes the quota usage of all the projects + Usage([]ProjectInfo) ([]ProjectUsage, error) + + // Persist record the data to DB, artifact, artifact_blob and blob tabel. + Persist([]ProjectInfo) error +} + +// ProjectInfo ... +type ProjectInfo struct { + Name string + Repos []RepoData +} + +// RepoData ... +type RepoData struct { + Name string + Afs []*models.Artifact + Afnbs []*models.ArtifactAndBlob + Blobs []*models.Blob +} + +// ProjectUsage ... +type ProjectUsage struct { + Project string + Used quota.ResourceList +} + +// Instance ... +type Instance func(promgr.ProjectManager) QuotaMigrator + +var adapters = make(map[string]Instance) + +// Register ... +func Register(name string, adapter Instance) { + if adapter == nil { + panic("quota: Register adapter is nil") + } + if _, ok := adapters[name]; ok { + panic("quota: Register called twice for adapter " + name) + } + adapters[name] = adapter +} + +// Sync ... +func Sync(pm promgr.ProjectManager, populate bool) error { + for name := range adapters { + if !config.WithChartMuseum() { + if name == "chart" { + continue + } + } + instanceFunc, ok := adapters[name] + if !ok { + err := fmt.Errorf("quota migtator: unknown adapter name %q", name) + return err + } + adapter := instanceFunc(pm) + data, err := adapter.Dump() + if err != nil { + return err + } + usage, err := adapter.Usage(data) + if err := ensureQuota(usage); err != nil { + return err + } + if populate { + if err := adapter.Persist(data); err != nil { + return err + } + } + } + return nil +} + +// ensureQuota updates the quota and quota usage in the data base. +func ensureQuota(usages []ProjectUsage) error { + var pid int64 + for _, usage := range usages { + project, err := dao.GetProjectByName(usage.Project) + if err != nil { + log.Error(err) + return err + } + pid = project.ProjectID + quotaMgr, err := quota.NewManager("project", strconv.FormatInt(pid, 10)) + if err != nil { + log.Errorf("Error occurred when to new quota manager %v", err) + return err + } + if err := quotaMgr.EnsureQuota(usage.Used); err != nil { + log.Errorf("cannot ensure quota for the project: %d, err: %v", pid, err) + return err + } + } + return nil +} diff --git a/src/core/api/quota/registry/registry.go b/src/core/api/quota/registry/registry.go new file mode 100644 index 000000000..19ad7aafc --- /dev/null +++ b/src/core/api/quota/registry/registry.go @@ -0,0 +1,420 @@ +// Copyright 2018 Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/models" + common_quota "github.com/goharbor/harbor/src/common/quota" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/common/utils/registry" + "github.com/goharbor/harbor/src/core/api" + quota "github.com/goharbor/harbor/src/core/api/quota" + "github.com/goharbor/harbor/src/core/promgr" + coreutils "github.com/goharbor/harbor/src/core/utils" + "github.com/pkg/errors" + "strings" + "sync" + "time" +) + +// Migrator ... +type Migrator struct { + pm promgr.ProjectManager +} + +// NewRegistryMigrator returns a new Migrator. +func NewRegistryMigrator(pm promgr.ProjectManager) quota.QuotaMigrator { + migrator := Migrator{ + pm: pm, + } + return &migrator +} + +// Dump ... +func (rm *Migrator) Dump() ([]quota.ProjectInfo, error) { + var ( + projects []quota.ProjectInfo + wg sync.WaitGroup + err error + ) + + reposInRegistry, err := api.Catalog() + if err != nil { + return nil, err + } + + // repoMap : map[project_name : []repo list] + repoMap := make(map[string][]string) + for _, item := range reposInRegistry { + projectName := strings.Split(item, "/")[0] + pro, err := rm.pm.Get(projectName) + if err != nil { + log.Errorf("failed to get project %s: %v", projectName, err) + continue + } + _, exist := repoMap[pro.Name] + if !exist { + repoMap[pro.Name] = []string{item} + } else { + repos := repoMap[pro.Name] + repos = append(repos, item) + repoMap[pro.Name] = repos + } + } + + wg.Add(len(repoMap)) + errChan := make(chan error, 1) + infoChan := make(chan interface{}) + done := make(chan bool, 1) + + go func() { + defer func() { + done <- true + }() + + for { + select { + case result := <-infoChan: + if result == nil { + return + } + project, ok := result.(quota.ProjectInfo) + if ok { + projects = append(projects, project) + } + + case e := <-errChan: + if err == nil { + err = errors.Wrap(e, "quota sync error on getting info of project") + } else { + err = errors.Wrap(e, err.Error()) + } + } + } + }() + + for project, repos := range repoMap { + go func(project string, repos []string) { + defer wg.Done() + info, err := infoOfProject(project, repos) + if err != nil { + errChan <- err + return + } + infoChan <- info + }(project, repos) + } + + wg.Wait() + close(infoChan) + + // wait for all of project info + <-done + + if err != nil { + return nil, err + } + + return projects, nil +} + +// Usage ... +// registry needs to merge the shard blobs of different repositories. +func (rm *Migrator) Usage(projects []quota.ProjectInfo) ([]quota.ProjectUsage, error) { + var pros []quota.ProjectUsage + + for _, project := range projects { + var size, count int64 + var blobs = make(map[string]int64) + + // usage count + for _, repo := range project.Repos { + count = count + int64(len(repo.Afs)) + // Because that there are some shared blobs between repositories, it needs to remove the duplicate items. + for _, blob := range repo.Blobs { + _, exist := blobs[blob.Digest] + if !exist { + blobs[blob.Digest] = blob.Size + } + } + } + // size + for _, item := range blobs { + size = size + item + } + + proUsage := quota.ProjectUsage{ + Project: project.Name, + Used: common_quota.ResourceList{ + common_quota.ResourceCount: count, + common_quota.ResourceStorage: size, + }, + } + pros = append(pros, proUsage) + } + + return pros, nil +} + +// Persist ... +func (rm *Migrator) Persist(projects []quota.ProjectInfo) error { + for _, project := range projects { + for _, repo := range project.Repos { + if err := persistAf(repo.Afs); err != nil { + return err + } + if err := persistAfnbs(repo.Afnbs); err != nil { + return err + } + if err := persistBlob(repo.Blobs); err != nil { + return err + } + } + } + if err := persistPB(projects); err != nil { + return err + } + return nil +} + +func persistAf(afs []*models.Artifact) error { + if len(afs) != 0 { + for _, af := range afs { + _, err := dao.AddArtifact(af) + if err != nil { + if err == dao.ErrDupRows { + continue + } + log.Error(err) + return err + } + } + } + return nil +} + +func persistAfnbs(afnbs []*models.ArtifactAndBlob) error { + if len(afnbs) != 0 { + for _, afnb := range afnbs { + _, err := dao.AddArtifactNBlob(afnb) + if err != nil { + if err == dao.ErrDupRows { + continue + } + log.Error(err) + return err + } + } + } + return nil +} + +func persistBlob(blobs []*models.Blob) error { + if len(blobs) != 0 { + for _, blob := range blobs { + _, err := dao.AddBlob(blob) + if err != nil { + if err == dao.ErrDupRows { + continue + } + log.Error(err) + return err + } + } + } + return nil +} + +func persistPB(projects []quota.ProjectInfo) error { + for _, project := range projects { + var blobs = make(map[string]int64) + var blobsOfPro []*models.Blob + for _, repo := range project.Repos { + for _, blob := range repo.Blobs { + _, exist := blobs[blob.Digest] + if exist { + continue + } + blobs[blob.Digest] = blob.Size + blobInDB, err := dao.GetBlob(blob.Digest) + if err != nil { + log.Error(err) + return err + } + if blobInDB != nil { + blobsOfPro = append(blobsOfPro, blobInDB) + } + } + } + pro, err := dao.GetProjectByName(project.Name) + if err != nil { + log.Error(err) + return err + } + _, err = dao.AddBlobsToProject(pro.ProjectID, blobsOfPro...) + if err != nil { + log.Error(err) + return err + } + } + return nil +} + +func infoOfProject(project string, repoList []string) (quota.ProjectInfo, error) { + var ( + repos []quota.RepoData + wg sync.WaitGroup + err error + ) + wg.Add(len(repoList)) + + errChan := make(chan error, 1) + infoChan := make(chan interface{}) + done := make(chan bool, 1) + + pro, err := dao.GetProjectByName(project) + if err != nil { + log.Error(err) + return quota.ProjectInfo{}, err + } + + go func() { + defer func() { + done <- true + }() + + for { + select { + case result := <-infoChan: + if result == nil { + return + } + repoData, ok := result.(quota.RepoData) + if ok { + repos = append(repos, repoData) + } + + case e := <-errChan: + if err == nil { + err = errors.Wrap(e, "quota sync error on getting info of repo") + } else { + err = errors.Wrap(e, err.Error()) + } + } + } + }() + + for _, repo := range repoList { + go func(pid int64, repo string) { + defer func() { + wg.Done() + }() + info, err := infoOfRepo(pid, repo) + if err != nil { + errChan <- err + return + } + infoChan <- info + }(pro.ProjectID, repo) + } + + wg.Wait() + close(infoChan) + + <-done + + if err != nil { + return quota.ProjectInfo{}, err + } + + return quota.ProjectInfo{ + Name: project, + Repos: repos, + }, nil +} + +func infoOfRepo(pid int64, repo string) (quota.RepoData, error) { + repoClient, err := coreutils.NewRepositoryClientForUI("harbor-core", repo) + if err != nil { + return quota.RepoData{}, err + } + tags, err := repoClient.ListTag() + if err != nil { + return quota.RepoData{}, err + } + var afnbs []*models.ArtifactAndBlob + var afs []*models.Artifact + var blobs []*models.Blob + + for _, tag := range tags { + _, mediaType, payload, err := repoClient.PullManifest(tag, []string{ + schema1.MediaTypeManifest, + schema1.MediaTypeSignedManifest, + schema2.MediaTypeManifest, + }) + if err != nil { + log.Error(err) + return quota.RepoData{}, err + } + manifest, desc, err := registry.UnMarshal(mediaType, payload) + if err != nil { + log.Error(err) + return quota.RepoData{}, err + } + // self + afnb := &models.ArtifactAndBlob{ + DigestAF: desc.Digest.String(), + DigestBlob: desc.Digest.String(), + } + afnbs = append(afnbs, afnb) + for _, layer := range manifest.References() { + afnb := &models.ArtifactAndBlob{ + DigestAF: desc.Digest.String(), + DigestBlob: layer.Digest.String(), + } + afnbs = append(afnbs, afnb) + blob := &models.Blob{ + Digest: layer.Digest.String(), + ContentType: layer.MediaType, + Size: layer.Size, + CreationTime: time.Now(), + } + blobs = append(blobs, blob) + } + af := &models.Artifact{ + PID: pid, + Repo: strings.Split(repo, "/")[1], + Tag: tag, + Digest: desc.Digest.String(), + Kind: "Docker-Image", + CreationTime: time.Now(), + } + afs = append(afs, af) + } + return quota.RepoData{ + Name: repo, + Afs: afs, + Afnbs: afnbs, + Blobs: blobs, + }, nil +} + +func init() { + quota.Register("registry", NewRegistryMigrator) +} diff --git a/src/core/api/utils.go b/src/core/api/utils.go index 4b7305524..4fd20d383 100644 --- a/src/core/api/utils.go +++ b/src/core/api/utils.go @@ -38,7 +38,7 @@ func SyncRegistry(pm promgr.ProjectManager) error { log.Infof("Start syncing repositories from registry to DB... ") - reposInRegistry, err := catalog() + reposInRegistry, err := Catalog() if err != nil { log.Error(err) return err @@ -105,7 +105,8 @@ func SyncRegistry(pm promgr.ProjectManager) error { return nil } -func catalog() ([]string, error) { +// Catalog ... +func Catalog() ([]string, error) { repositories := []string{} rc, err := initRegistryClient() diff --git a/src/core/main.go b/src/core/main.go index 6ea199757..9883f9cc8 100755 --- a/src/core/main.go +++ b/src/core/main.go @@ -34,6 +34,11 @@ import ( _ "github.com/goharbor/harbor/src/core/auth/db" _ "github.com/goharbor/harbor/src/core/auth/ldap" _ "github.com/goharbor/harbor/src/core/auth/uaa" + + quota "github.com/goharbor/harbor/src/core/api/quota" + _ "github.com/goharbor/harbor/src/core/api/quota/chart" + _ "github.com/goharbor/harbor/src/core/api/quota/registry" + "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/filter" "github.com/goharbor/harbor/src/core/middlewares" @@ -74,6 +79,32 @@ func updateInitPassword(userID int, password string) error { return nil } +// Quota migration +func quotaSync() error { + usages, err := dao.ListQuotaUsages() + if err != nil { + log.Errorf("list quota usage error, %v", err) + return err + } + projects, err := dao.GetProjects(nil) + if err != nil { + log.Errorf("list project error, %v", err) + return err + } + + // upgrade from old version + if len(projects) > 1 && len(usages) == 1 { + log.Info("Start to sync quota data .....") + if err := quota.Sync(config.GlobalProjectMgr, true); err != nil { + log.Errorf("Error happened when syncing quota usage data, %v", err) + return err + } + log.Info("Success to sync quota data .....") + } + + return nil +} + func gracefulShutdown(closing chan struct{}) { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) @@ -174,6 +205,9 @@ func main() { log.Fatalf("init proxy error, %v", err) } - // go proxy.StartProxy() + if err := quotaSync(); err != nil { + log.Fatalf("quota migration error, %v", err) + } + beego.Run() } diff --git a/src/core/router.go b/src/core/router.go index 39729174a..7e01b934e 100755 --- a/src/core/router.go +++ b/src/core/router.go @@ -135,6 +135,7 @@ func initRouters() { beego.Router("/api/internal/syncregistry", &api.InternalAPI{}, "post:SyncRegistry") beego.Router("/api/internal/renameadmin", &api.InternalAPI{}, "post:RenameAdmin") beego.Router("/api/internal/switchquota", &api.InternalAPI{}, "put:SwitchQuota") + beego.Router("/api/internal/syncquota", &api.InternalAPI{}, "post:SyncQuota") // external service that hosted on harbor process: beego.Router("/service/notifications", ®istry.NotificationHandler{})