Migrate artifact data in 2.0

Abstract extra attributes and annotations for artifacts stored in database

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2020-02-24 19:50:02 +08:00
parent 0f5a115a65
commit 4c9b59c904
13 changed files with 278 additions and 75 deletions

View File

@ -1,4 +1,26 @@
/*
table artifact:
id SERIAL PRIMARY KEY NOT NULL,
type varchar(255) NOT NULL,
media_type varchar(255) NOT NULL,
manifest_media_type varchar(255) NOT NULL,
project_id int NOT NULL,
repository_id int NOT NULL,
repository_name varchar(255) NOT NULL,
digest varchar(255) NOT NULL,
size bigint,
push_time timestamp default CURRENT_TIMESTAMP,
pull_time timestamp,
extra_attrs text,
annotations jsonb,
CONSTRAINT unique_artifact_2 UNIQUE (repository_id, digest)
*/
ALTER TABLE admin_job ADD COLUMN job_parameters varchar(255) Default ''; ALTER TABLE admin_job ADD COLUMN job_parameters varchar(255) Default '';
/*record the data version to decide whether the data migration should be skipped*/
ALTER TABLE schema_migrations ADD COLUMN data_version int;
ALTER TABLE artifact ADD COLUMN repository_id int; ALTER TABLE artifact ADD COLUMN repository_id int;
ALTER TABLE artifact ADD COLUMN media_type varchar(255); ALTER TABLE artifact ADD COLUMN media_type varchar(255);
ALTER TABLE artifact ADD COLUMN manifest_media_type varchar(255); ALTER TABLE artifact ADD COLUMN manifest_media_type varchar(255);
@ -101,7 +123,6 @@ CREATE TABLE artifact_trash
CONSTRAINT unique_artifact_trash UNIQUE (repository_name, digest) CONSTRAINT unique_artifact_trash UNIQUE (repository_name, digest)
); );
/* TODO upgrade: how about keep the table "harbor_resource_label" only for helm v2 chart and use the new table for artifact label reference? */
/* label_reference records the labels added to the artifact */ /* label_reference records the labels added to the artifact */
CREATE TABLE label_reference ( CREATE TABLE label_reference (
id SERIAL PRIMARY KEY NOT NULL, id SERIAL PRIMARY KEY NOT NULL,
@ -114,6 +135,23 @@ CREATE TABLE label_reference (
CONSTRAINT unique_label_reference UNIQUE (label_id,artifact_id) CONSTRAINT unique_label_reference UNIQUE (label_id,artifact_id)
); );
/*move the labels added to tag to artifact*/
INSERT INTO label_reference (label_id, artifact_id, creation_time, update_time)
(
SELECT label.label_id, repo_tag.artifact_id, label.creation_time, label.update_time
FROM harbor_resource_label AS label
JOIN (
SELECT tag.artifact_id, CONCAT(repository.name, ':', tag.name) as name
FROM tag
JOIN repository
ON tag.repository_id = repository.repository_id
) AS repo_tag
ON repo_tag.name = label.resource_name AND label.resource_type = 'i'
) ON CONFLICT DO NOTHING;
/*remove the records for images in table 'harbor_resource_label'*/
DELETE FROM harbor_resource_label WHERE resource_type = 'i';
/* TODO remove this table after clean up code that related with the old artifact model */ /* TODO remove this table after clean up code that related with the old artifact model */
CREATE TABLE artifact_2 CREATE TABLE artifact_2

View File

@ -418,7 +418,10 @@ func (c *controller) UpdatePullTime(ctx context.Context, artifactID int64, tagID
if tag.ArtifactID != artifactID { if tag.ArtifactID != artifactID {
return fmt.Errorf("tag %d isn't attached to artifact %d", tagID, artifactID) return fmt.Errorf("tag %d isn't attached to artifact %d", tagID, artifactID)
} }
if err := c.artMgr.UpdatePullTime(ctx, artifactID, time); err != nil { if err := c.artMgr.Update(ctx, &artifact.Artifact{
ID: artifactID,
PullTime: time,
}, "PullTime"); err != nil {
return err return err
} }
return c.tagCtl.Update(ctx, tag, "PullTime") return c.tagCtl.Update(ctx, tag, "PullTime")

View File

@ -488,8 +488,8 @@ func (c *controllerTestSuite) TestUpdatePullTime() {
ArtifactID: 1, ArtifactID: 1,
}, },
}, nil) }, nil)
c.artMgr.On("UpdatePullTime").Return(nil)
c.tagCtl.On("Update").Return(nil) c.tagCtl.On("Update").Return(nil)
c.artMgr.On("Update").Return(nil)
err := c.ctl.UpdatePullTime(nil, 1, 1, time.Now()) err := c.ctl.UpdatePullTime(nil, 1, 1, time.Now())
c.Require().Nil(err) c.Require().Nil(err)
c.artMgr.AssertExpectations(c.T()) c.artMgr.AssertExpectations(c.T())

View File

@ -90,33 +90,6 @@ func InitDatabase(database *models.Database) error {
return nil return nil
} }
// InitAndUpgradeDatabase - init database and upgrade when required
func InitAndUpgradeDatabase(database *models.Database) error {
if err := InitDatabase(database); err != nil {
return err
}
if err := UpgradeSchema(database); err != nil {
return err
}
if err := CheckSchemaVersion(); err != nil {
return err
}
return nil
}
// CheckSchemaVersion checks that whether the schema version matches with the expected one
func CheckSchemaVersion() error {
version, err := GetSchemaVersion()
if err != nil {
return err
}
if version.Version != SchemaVersion {
return fmt.Errorf("unexpected database schema version, expected %s, got %s",
SchemaVersion, version.Version)
}
return nil
}
func getDatabase(database *models.Database) (db Database, err error) { func getDatabase(database *models.Database) (db Database, err error) {
switch database.Type { switch database.Type {

View File

@ -16,8 +16,10 @@ package dao
import ( import (
"fmt" "fmt"
"github.com/goharbor/harbor/src/common/models"
"net/url" "net/url"
"os" "os"
"strconv"
"github.com/astaxie/beego/orm" "github.com/astaxie/beego/orm"
"github.com/golang-migrate/migrate" "github.com/golang-migrate/migrate"
@ -92,21 +94,18 @@ func (p *pgsql) Register(alias ...string) error {
// UpgradeSchema calls migrate tool to upgrade schema to the latest based on the SQL scripts. // UpgradeSchema calls migrate tool to upgrade schema to the latest based on the SQL scripts.
func (p *pgsql) UpgradeSchema() error { func (p *pgsql) UpgradeSchema() error {
dbURL := url.URL{ port, err := strconv.ParseInt(p.port, 10, 64)
Scheme: "postgres", if err != nil {
User: url.UserPassword(p.usr, p.pwd), return err
Host: fmt.Sprintf("%s:%s", p.host, p.port),
Path: p.database,
RawQuery: fmt.Sprintf("sslmode=%s", p.sslmode),
} }
m, err := NewMigrator(&models.PostGreSQL{
// For UT Host: p.host,
path := os.Getenv("POSTGRES_MIGRATION_SCRIPTS_PATH") Port: int(port),
if len(path) == 0 { Username: p.usr,
path = defaultMigrationPath Password: p.pwd,
} Database: p.database,
srcURL := fmt.Sprintf("file://%s", path) SSLMode: p.sslmode,
m, err := migrate.New(srcURL, dbURL.String()) })
if err != nil { if err != nil {
return err return err
} }
@ -126,3 +125,22 @@ func (p *pgsql) UpgradeSchema() error {
} }
return nil return nil
} }
// NewMigrator creates a migrator base on the information
func NewMigrator(database *models.PostGreSQL) (*migrate.Migrate, error) {
dbURL := url.URL{
Scheme: "postgres",
User: url.UserPassword(database.Username, database.Password),
Host: fmt.Sprintf("%s:%d", database.Host, database.Port),
Path: database.Database,
RawQuery: fmt.Sprintf("sslmode=%s", database.SSLMode),
}
// For UT
path := os.Getenv("POSTGRES_MIGRATION_SCRIPTS_PATH")
if len(path) == 0 {
path = defaultMigrationPath
}
srcURL := fmt.Sprintf("file://%s", path)
return migrate.New(srcURL, dbURL.String())
}

View File

@ -146,7 +146,6 @@ func GetProjects(query *models.ProjectQueryParam) ([]*models.Project, error) {
p.creation_time, p.update_time ` + sqlStr + ` order by p.name` p.creation_time, p.update_time ` + sqlStr + ` order by p.name`
sqlStr, queryParam = CreatePagination(query, sqlStr, queryParam) sqlStr, queryParam = CreatePagination(query, sqlStr, queryParam)
log.Debugf("sql:=%+v, param= %+v", sqlStr, queryParam)
var projects []*models.Project var projects []*models.Project
_, err := GetOrmer().Raw(sqlStr, queryParam).QueryRows(&projects) _, err := GetOrmer().Raw(sqlStr, queryParam).QueryRows(&projects)

View File

@ -64,8 +64,11 @@ func InitDatabaseFromEnv() {
log.Infof("POSTGRES_HOST: %s, POSTGRES_USR: %s, POSTGRES_PORT: %d, POSTGRES_PWD: %s\n", dbHost, dbUser, dbPort, dbPassword) log.Infof("POSTGRES_HOST: %s, POSTGRES_USR: %s, POSTGRES_PORT: %d, POSTGRES_PWD: %s\n", dbHost, dbUser, dbPort, dbPassword)
if err := dao.InitAndUpgradeDatabase(database); err != nil { if err := dao.InitDatabase(database); err != nil {
log.Fatalf("failed to init and upgrade database : %v", err) log.Fatalf("failed to init database : %v", err)
}
if err := dao.UpgradeSchema(database); err != nil {
log.Fatalf("failed to upgrade database : %v", err)
} }
if err := updateUserInitialPassword(1, adminPwd); err != nil { if err := updateUserInitialPassword(1, adminPwd); err != nil {
log.Fatalf("failed to init password for admin: %v", err) log.Fatalf("failed to init password for admin: %v", err)

View File

@ -17,6 +17,7 @@ package main
import ( import (
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"github.com/goharbor/harbor/src/migration"
"os" "os"
"os/signal" "os/signal"
"strconv" "strconv"
@ -183,9 +184,12 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("failed to get database configuration: %v", err) log.Fatalf("failed to get database configuration: %v", err)
} }
if err := dao.InitAndUpgradeDatabase(database); err != nil { if err := dao.InitDatabase(database); err != nil {
log.Fatalf("failed to initialize database: %v", err) log.Fatalf("failed to initialize database: %v", err)
} }
if err = migration.Migrate(database); err != nil {
log.Fatalf("failed to migrate: %v", err)
}
if err := config.Load(); err != nil { if err := config.Load(); err != nil {
log.Fatalf("failed to load config: %v", err) log.Fatalf("failed to load config: %v", err)
} }
@ -229,20 +233,6 @@ func main() {
server.RegisterRoutes() server.RegisterRoutes()
syncQuota := os.Getenv("SYNC_QUOTA")
doSyncQuota, err := strconv.ParseBool(syncQuota)
if err != nil {
log.Errorf("Failed to parse SYNC_QUOTA: %v", err)
doSyncQuota = true
}
if doSyncQuota {
if err := quotaSync(); err != nil {
log.Fatalf("quota migration error, %v", err)
}
} else {
log.Infof("Because SYNC_QUOTA set false , no need to sync quota \n")
}
log.Infof("Version: %s, Git commit: %s", version.ReleaseVersion, version.GitCommit) log.Infof("Version: %s, Git commit: %s", version.ReleaseVersion, version.GitCommit)
beego.RunWithMiddleWares("", middlewares.MiddleWares()...) beego.RunWithMiddleWares("", middlewares.MiddleWares()...)

87
src/migration/artifact.go Normal file
View File

@ -0,0 +1,87 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migration
import (
"context"
"github.com/goharbor/harbor/src/api/artifact/abstractor"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/pkg/artifact"
"github.com/goharbor/harbor/src/pkg/project"
"github.com/goharbor/harbor/src/pkg/q"
"github.com/goharbor/harbor/src/pkg/repository"
)
func upgradeData(ctx context.Context) error {
abstractor := abstractor.NewAbstractor()
pros, err := project.Mgr.List()
if err != nil {
return err
}
for _, pro := range pros {
repos, err := repository.Mgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ProjectID": pro.ProjectID,
},
})
if err != nil {
log.Errorf("failed to list repositories under the project %s: %v, skip", pro.Name, err)
continue
}
for _, repo := range repos {
log.Debugf("abstracting artifact metadata under repository %s ....", repo.Name)
arts, err := artifact.Mgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"RepositoryID": repo.RepositoryID,
},
})
if err != nil {
log.Errorf("failed to list artifacts under the repository %s: %v, skip", repo.Name, err)
continue
}
for _, art := range arts {
if err = abstract(ctx, abstractor, art); err != nil {
log.Errorf("failed to abstract the artifact %s@%s: %v, skip", art.RepositoryName, art.Digest, err)
continue
}
if err = artifact.Mgr.Update(ctx, art); err != nil {
log.Errorf("failed to update the artifact %s@%s: %v, skip", repo.Name, art.Digest, err)
continue
}
}
log.Debugf("artifact metadata under repository %s abstracted", repo.Name)
}
}
// update data version
return setDataVersion(ctx, 30)
}
func abstract(ctx context.Context, abstractor abstractor.Abstractor, art *artifact.Artifact) error {
// abstract the children
for _, reference := range art.References {
child, err := artifact.Mgr.Get(ctx, reference.ChildID)
if err != nil {
log.Errorf("failed to get the artifact %d: %v, skip", reference.ChildID, err)
continue
}
if err = abstract(ctx, abstractor, child); err != nil {
log.Errorf("failed to abstract the artifact %s@%s: %v, skip", child.RepositoryName, child.Digest, err)
continue
}
}
// abstract the parent
return abstractor.AbstractMetadata(ctx, art)
}

View File

@ -0,0 +1,94 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migration
import (
"context"
"fmt"
beegorm "github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/internal/orm"
"github.com/golang-migrate/migrate"
)
// Migrate the database schema and data
func Migrate(database *models.Database) error {
// check the database schema version
migrator, err := dao.NewMigrator(database.PostGreSQL)
if err != nil {
return err
}
defer migrator.Close()
schemaVersion, _, err := migrator.Version()
if err != nil && err != migrate.ErrNilVersion {
return err
}
log.Debugf("current database schema version: %v", schemaVersion)
// prior to 1.9, version = 0 means fresh install
if schemaVersion > 0 && schemaVersion < 10 {
return fmt.Errorf("please upgrade to version 1.9 first")
}
// update database schema
if err := dao.UpgradeSchema(database); err != nil {
return err
}
ctx := orm.NewContext(context.Background(), beegorm.NewOrm())
dataVersion, err := getDataVersion(ctx)
if err != nil {
return err
}
log.Debugf("current data version: %v", dataVersion)
// the abstract logic already done before, skip
if dataVersion == 30 {
log.Debug("no change in data, skip")
return nil
}
// upgrade data
if err = upgradeData(ctx); err != nil {
return err
}
// update quota
// TODO
return nil
}
func getDataVersion(ctx context.Context) (int, error) {
ormer, err := orm.FromContext(ctx)
if err != nil {
return 0, err
}
var version int
if err = ormer.Raw("select data_version from schema_migrations").QueryRow(&version); err != nil {
return 0, err
}
return version, nil
}
func setDataVersion(ctx context.Context, version int) error {
ormer, err := orm.FromContext(ctx)
if err != nil {
return err
}
_, err = ormer.Raw("update schema_migrations set data_version=?", version).Exec()
return err
}

View File

@ -16,8 +16,6 @@ package artifact
import ( import (
"context" "context"
"time"
"github.com/goharbor/harbor/src/pkg/artifact/dao" "github.com/goharbor/harbor/src/pkg/artifact/dao"
"github.com/goharbor/harbor/src/pkg/q" "github.com/goharbor/harbor/src/pkg/q"
) )
@ -45,8 +43,8 @@ type Manager interface {
// Delete just deletes the artifact record. The underlying data of registry will be // Delete just deletes the artifact record. The underlying data of registry will be
// removed during garbage collection // removed during garbage collection
Delete(ctx context.Context, id int64) (err error) Delete(ctx context.Context, id int64) (err error)
// UpdatePullTime updates the pull time of the artifact // Update the artifact. Only the properties specified by "props" will be updated if it is set
UpdatePullTime(ctx context.Context, artifactID int64, time time.Time) (err error) Update(ctx context.Context, artifact *Artifact, props ...string) (err error)
// ListReferences according to the query // ListReferences according to the query
ListReferences(ctx context.Context, query *q.Query) (references []*Reference, err error) ListReferences(ctx context.Context, query *q.Query) (references []*Reference, err error)
// DeleteReference specified by ID // DeleteReference specified by ID
@ -123,11 +121,9 @@ func (m *manager) Delete(ctx context.Context, id int64) error {
// delete artifact // delete artifact
return m.dao.Delete(ctx, id) return m.dao.Delete(ctx, id)
} }
func (m *manager) UpdatePullTime(ctx context.Context, artifactID int64, time time.Time) error {
return m.dao.Update(ctx, &dao.Artifact{ func (m *manager) Update(ctx context.Context, artifact *Artifact, props ...string) (err error) {
ID: artifactID, return m.dao.Update(ctx, artifact.To(), props...)
PullTime: time,
}, "PullTime")
} }
func (m *manager) ListReferences(ctx context.Context, query *q.Query) ([]*Reference, error) { func (m *manager) ListReferences(ctx context.Context, query *q.Query) ([]*Reference, error) {

View File

@ -226,9 +226,12 @@ func (m *managerTestSuite) TestDelete() {
m.dao.AssertExpectations(m.T()) m.dao.AssertExpectations(m.T())
} }
func (m *managerTestSuite) TestUpdatePullTime() { func (m *managerTestSuite) TestUpdate() {
m.dao.On("Update", mock.Anything).Return(nil) m.dao.On("Update", mock.Anything).Return(nil)
err := m.mgr.UpdatePullTime(nil, 1, time.Now()) err := m.mgr.Update(nil, &Artifact{
ID: 1,
PullTime: time.Now(),
}, "PullTime")
m.Require().Nil(err) m.Require().Nil(err)
m.dao.AssertExpectations(m.T()) m.dao.AssertExpectations(m.T())
} }

View File

@ -19,7 +19,6 @@ import (
"github.com/goharbor/harbor/src/pkg/artifact" "github.com/goharbor/harbor/src/pkg/artifact"
"github.com/goharbor/harbor/src/pkg/q" "github.com/goharbor/harbor/src/pkg/q"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"time"
) )
// FakeManager is a fake artifact manager that implement src/pkg/artifact.Manager interface // FakeManager is a fake artifact manager that implement src/pkg/artifact.Manager interface
@ -76,7 +75,7 @@ func (f *FakeManager) Delete(ctx context.Context, id int64) error {
} }
// UpdatePullTime ... // UpdatePullTime ...
func (f *FakeManager) UpdatePullTime(ctx context.Context, artifactID int64, time time.Time) error { func (f *FakeManager) Update(ctx context.Context, artifact *artifact.Artifact, props ...string) error {
args := f.Called() args := f.Called()
return args.Error(0) return args.Error(0)
} }