mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-27 10:01:27 +01:00
Merge pull request #16107 from chlins/feat/async-update-artifact-pull_count
feat: async update artifact pull time and repository pull count
This commit is contained in:
commit
b417e877b5
@ -492,10 +492,7 @@ tags:
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) UpdatePullTime(ctx context.Context, artifactID int64, tagID int64, time time.Time) error {
|
func (c *controller) UpdatePullTime(ctx context.Context, artifactID int64, tagID int64, time time.Time) error {
|
||||||
if err := c.artMgr.Update(ctx, &artifact.Artifact{
|
if err := c.artMgr.UpdatePullTime(ctx, artifactID, time); err != nil {
|
||||||
ID: artifactID,
|
|
||||||
PullTime: time,
|
|
||||||
}, "PullTime"); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
tg, err := c.tagCtl.Get(ctx, tagID, nil)
|
tg, err := c.tagCtl.Get(ctx, tagID, nil)
|
||||||
|
@ -562,7 +562,7 @@ func (c *controllerTestSuite) TestUpdatePullTime() {
|
|||||||
},
|
},
|
||||||
}, nil)
|
}, nil)
|
||||||
c.tagCtl.On("Update").Return(nil)
|
c.tagCtl.On("Update").Return(nil)
|
||||||
c.artMgr.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
c.artMgr.On("UpdatePullTime", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
err := c.ctl.UpdatePullTime(nil, 1, 1, time.Now())
|
err := c.ctl.UpdatePullTime(nil, 1, 1, time.Now())
|
||||||
c.Require().Nil(err)
|
c.Require().Nil(err)
|
||||||
c.artMgr.AssertExpectations(c.T())
|
c.artMgr.AssertExpectations(c.T())
|
||||||
@ -578,7 +578,7 @@ func (c *controllerTestSuite) TestUpdatePullTime() {
|
|||||||
ArtifactID: 2,
|
ArtifactID: 2,
|
||||||
},
|
},
|
||||||
}, nil)
|
}, nil)
|
||||||
c.artMgr.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
c.artMgr.On("UpdatePullTime", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
err = c.ctl.UpdatePullTime(nil, 1, 1, time.Now())
|
err = c.ctl.UpdatePullTime(nil, 1, 1, time.Now())
|
||||||
c.Require().NotNil(err)
|
c.Require().NotNil(err)
|
||||||
c.tagCtl.AssertExpectations(c.T())
|
c.tagCtl.AssertExpectations(c.T())
|
||||||
|
@ -16,19 +16,63 @@ package internal
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/goharbor/harbor/src/lib/config"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/controller/artifact"
|
"github.com/goharbor/harbor/src/controller/artifact"
|
||||||
"github.com/goharbor/harbor/src/controller/event"
|
"github.com/goharbor/harbor/src/controller/event"
|
||||||
"github.com/goharbor/harbor/src/controller/repository"
|
"github.com/goharbor/harbor/src/controller/repository"
|
||||||
"github.com/goharbor/harbor/src/controller/tag"
|
"github.com/goharbor/harbor/src/controller/tag"
|
||||||
|
"github.com/goharbor/harbor/src/lib/config"
|
||||||
"github.com/goharbor/harbor/src/lib/log"
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"github.com/goharbor/harbor/src/lib/q"
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// defaultAsyncFlushDuration is the default flush interval.
|
||||||
|
defaultAsyncFlushDuration = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
asyncFlushDuration time.Duration
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// get async flush duration from env, if not provide,
|
||||||
|
// use default value: 10*time.second
|
||||||
|
envDuration := os.Getenv("ARTIFACT_PULL_ASYNC_FLUSH_DURATION")
|
||||||
|
if len(envDuration) == 0 {
|
||||||
|
// use default value
|
||||||
|
asyncFlushDuration = defaultAsyncFlushDuration
|
||||||
|
} else {
|
||||||
|
duration, err := strconv.ParseInt(envDuration, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("error to parse ARTIFACT_PULL_ASYNC_FLUSH_DURATION: %v, will use default value: %v", err, defaultAsyncFlushDuration)
|
||||||
|
asyncFlushDuration = defaultAsyncFlushDuration
|
||||||
|
} else {
|
||||||
|
asyncFlushDuration = time.Duration(duration) * time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Handler preprocess artifact event data
|
// Handler preprocess artifact event data
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
|
once sync.Once
|
||||||
|
// pullCountStore caches the pull count group by repository
|
||||||
|
// map[repositoryID]counts
|
||||||
|
pullCountStore map[int64]uint64
|
||||||
|
// pullCountLock mutex for pullCountStore
|
||||||
|
pullCountLock sync.Mutex
|
||||||
|
// pullTimeStore caches the latest pull time group by artifact
|
||||||
|
// map[artifactID:tagName]time
|
||||||
|
pullTimeStore map[string]time.Time
|
||||||
|
// pullTimeLock mutex for pullTimeStore
|
||||||
|
pullTimeLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name ...
|
// Name ...
|
||||||
@ -55,43 +99,136 @@ func (a *Handler) IsStateful() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *Handler) onPull(ctx context.Context, event *event.ArtifactEvent) error {
|
func (a *Handler) onPull(ctx context.Context, event *event.ArtifactEvent) error {
|
||||||
|
// if duration is equal to 0 or negative, keep original sync mode.
|
||||||
|
if asyncFlushDuration <= 0 {
|
||||||
|
var tagName string
|
||||||
|
if len(event.Tags) > 0 {
|
||||||
|
tagName = event.Tags[0]
|
||||||
|
}
|
||||||
|
|
||||||
if !config.PullTimeUpdateDisable(ctx) {
|
if !config.PullTimeUpdateDisable(ctx) {
|
||||||
go func() { a.updatePullTime(ctx, event) }()
|
a.syncFlushPullTime(ctx, event.Artifact.ID, tagName, time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
if !config.PullCountUpdateDisable(ctx) {
|
if !config.PullCountUpdateDisable(ctx) {
|
||||||
go func() { a.addPullCount(ctx, event) }()
|
a.syncFlushPullCount(ctx, event.Artifact.RepositoryID, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Handler) updatePullTime(ctx context.Context, event *event.ArtifactEvent) {
|
// async mode, update in cache firstly and flush to db by workers periodically.
|
||||||
var tagID int64
|
a.once.Do(func() {
|
||||||
|
if !config.PullTimeUpdateDisable(ctx) {
|
||||||
|
a.pullTimeStore = make(map[string]time.Time)
|
||||||
|
go a.asyncFlushPullTime(orm.Context())
|
||||||
|
}
|
||||||
|
|
||||||
|
if !config.PullCountUpdateDisable(ctx) {
|
||||||
|
a.pullCountStore = make(map[int64]uint64)
|
||||||
|
go a.asyncFlushPullCount(orm.Context())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if !config.PullTimeUpdateDisable(ctx) {
|
||||||
|
a.updatePullTimeInCache(ctx, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !config.PullCountUpdateDisable(ctx) {
|
||||||
|
a.addPullCountInCache(ctx, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Handler) updatePullTimeInCache(ctx context.Context, event *event.ArtifactEvent) {
|
||||||
|
var tagName string
|
||||||
if len(event.Tags) != 0 {
|
if len(event.Tags) != 0 {
|
||||||
tags, err := tag.Ctl.List(ctx, &q.Query{
|
tagName = event.Tags[0]
|
||||||
Keywords: map[string]interface{}{
|
}
|
||||||
"ArtifactID": event.Artifact.ID,
|
|
||||||
"Name": event.Tags[0],
|
key := fmt.Sprintf("%d:%s", event.Artifact.ID, tagName)
|
||||||
},
|
|
||||||
}, nil)
|
a.pullTimeLock.Lock()
|
||||||
|
defer a.pullTimeLock.Unlock()
|
||||||
|
|
||||||
|
a.pullTimeStore[key] = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Handler) addPullCountInCache(ctx context.Context, event *event.ArtifactEvent) {
|
||||||
|
a.pullCountLock.Lock()
|
||||||
|
defer a.pullCountLock.Unlock()
|
||||||
|
|
||||||
|
a.pullCountStore[event.Artifact.RepositoryID] = a.pullCountStore[event.Artifact.RepositoryID] + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Handler) syncFlushPullTime(ctx context.Context, artifactID int64, tagName string, time time.Time) {
|
||||||
|
var tagID int64
|
||||||
|
|
||||||
|
if tagName != "" {
|
||||||
|
tags, err := tag.Ctl.List(ctx, q.New(
|
||||||
|
map[string]interface{}{
|
||||||
|
"ArtifactID": artifactID,
|
||||||
|
"Name": tagName,
|
||||||
|
}), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Infof("failed to list tags when to update pull time, %v", err)
|
log.Warningf("failed to list tags when to update pull time, %v", err)
|
||||||
} else {
|
} else {
|
||||||
if len(tags) != 0 {
|
if len(tags) != 0 {
|
||||||
tagID = tags[0].ID
|
tagID = tags[0].ID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := artifact.Ctl.UpdatePullTime(ctx, event.Artifact.ID, tagID, time.Now()); err != nil {
|
|
||||||
log.Debugf("failed to update pull time form artifact %d, %v", event.Artifact.ID, err)
|
if err := artifact.Ctl.UpdatePullTime(ctx, artifactID, tagID, time); err != nil {
|
||||||
|
log.Warningf("failed to update pull time for artifact %d, %v", artifactID, err)
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Handler) addPullCount(ctx context.Context, event *event.ArtifactEvent) {
|
func (a *Handler) syncFlushPullCount(ctx context.Context, repositoryID int64, count uint64) {
|
||||||
if err := repository.Ctl.AddPullCount(ctx, event.Artifact.RepositoryID); err != nil {
|
if err := repository.Ctl.AddPullCount(ctx, repositoryID, count); err != nil {
|
||||||
log.Debugf("failed to add pull count repository %d, %v", event.Artifact.RepositoryID, err)
|
log.Warningf("failed to add pull count repository %d, %v", repositoryID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Handler) asyncFlushPullTime(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
<-time.After(asyncFlushDuration)
|
||||||
|
a.pullTimeLock.Lock()
|
||||||
|
|
||||||
|
for key, time := range a.pullTimeStore {
|
||||||
|
keys := strings.Split(key, ":")
|
||||||
|
artifactID, err := strconv.ParseInt(keys[0], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("failed to parse artifact id %s, %v", key, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var tagName string
|
||||||
|
if len(keys) > 1 && keys[1] != "" {
|
||||||
|
tagName = keys[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
a.syncFlushPullTime(ctx, artifactID, tagName, time)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.pullTimeStore = make(map[string]time.Time)
|
||||||
|
a.pullTimeLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Handler) asyncFlushPullCount(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
<-time.After(asyncFlushDuration)
|
||||||
|
a.pullCountLock.Lock()
|
||||||
|
|
||||||
|
for repositoryID, count := range a.pullCountStore {
|
||||||
|
a.syncFlushPullCount(ctx, repositoryID, count)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.pullCountStore = make(map[int64]uint64)
|
||||||
|
a.pullCountLock.Unlock()
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Handler) onPush(ctx context.Context, event *event.ArtifactEvent) error {
|
func (a *Handler) onPush(ctx context.Context, event *event.ArtifactEvent) error {
|
||||||
|
@ -1 +1,144 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
package internal
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
beegoorm "github.com/astaxie/beego/orm"
|
||||||
|
common_dao "github.com/goharbor/harbor/src/common/dao"
|
||||||
|
"github.com/goharbor/harbor/src/controller/event"
|
||||||
|
"github.com/goharbor/harbor/src/lib/config"
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/artifact"
|
||||||
|
_ "github.com/goharbor/harbor/src/pkg/config/db"
|
||||||
|
repo "github.com/goharbor/harbor/src/pkg/repository"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/repository/model"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/tag"
|
||||||
|
tagmodel "github.com/goharbor/harbor/src/pkg/tag/model/tag"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ArtifactHandlerTestSuite is test suite for artifact handler.
|
||||||
|
type ArtifactHandlerTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
handler *Handler
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestArtifactHandler tests ArtifactHandler.
|
||||||
|
func TestArtifactHandler(t *testing.T) {
|
||||||
|
suite.Run(t, &ArtifactHandlerTestSuite{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetupSuite prepares for running ArtifactHandlerTestSuite.
|
||||||
|
func (suite *ArtifactHandlerTestSuite) SetupSuite() {
|
||||||
|
common_dao.PrepareTestForPostgresSQL()
|
||||||
|
config.Init()
|
||||||
|
suite.handler = &Handler{}
|
||||||
|
suite.ctx = orm.NewContext(context.TODO(), beegoorm.NewOrm())
|
||||||
|
|
||||||
|
// mock artifact
|
||||||
|
_, err := artifact.Mgr.Create(suite.ctx, &artifact.Artifact{ID: 1, RepositoryID: 1})
|
||||||
|
suite.Nil(err)
|
||||||
|
// mock repository
|
||||||
|
_, err = repo.Mgr.Create(suite.ctx, &model.RepoRecord{RepositoryID: 1})
|
||||||
|
suite.Nil(err)
|
||||||
|
// mock tag
|
||||||
|
_, err = tag.Mgr.Create(suite.ctx, &tagmodel.Tag{ID: 1, RepositoryID: 1, ArtifactID: 1, Name: "latest"})
|
||||||
|
suite.Nil(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TearDownSuite cleans environment.
|
||||||
|
func (suite *ArtifactHandlerTestSuite) TearDownSuite() {
|
||||||
|
// delete tag
|
||||||
|
err := tag.Mgr.Delete(suite.ctx, 1)
|
||||||
|
suite.Nil(err)
|
||||||
|
// delete artifact
|
||||||
|
err = artifact.Mgr.Delete(suite.ctx, 1)
|
||||||
|
suite.Nil(err)
|
||||||
|
// delete repository
|
||||||
|
err = repo.Mgr.Delete(suite.ctx, 1)
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestName tests method Name.
|
||||||
|
func (suite *ArtifactHandlerTestSuite) TestName() {
|
||||||
|
suite.Equal("InternalArtifact", suite.handler.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestIsStateful tests method IsStateful.
|
||||||
|
func (suite *ArtifactHandlerTestSuite) TestIsStateful() {
|
||||||
|
suite.False(suite.handler.IsStateful(), "artifact handler is not stateful")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDefaultAsyncDuration tests default value of async flush duration.
|
||||||
|
func (suite *ArtifactHandlerTestSuite) TestDefaultAsyncFlushDuration() {
|
||||||
|
suite.Equal(defaultAsyncFlushDuration, asyncFlushDuration, "default async flush duration")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestOnPush tests handle push events.
|
||||||
|
func (suite *ArtifactHandlerTestSuite) TestOnPush() {
|
||||||
|
err := suite.handler.onPush(context.TODO(), &event.ArtifactEvent{Artifact: &artifact.Artifact{}})
|
||||||
|
suite.Nil(err, "onPush should return nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestOnPull tests handler pull events.
|
||||||
|
func (suite *ArtifactHandlerTestSuite) TestOnPull() {
|
||||||
|
// test sync mode
|
||||||
|
asyncFlushDuration = 0
|
||||||
|
err := suite.handler.onPull(suite.ctx, &event.ArtifactEvent{Artifact: &artifact.Artifact{ID: 1, RepositoryID: 1}, Tags: []string{"latest"}})
|
||||||
|
suite.Nil(err, "onPull should return nil")
|
||||||
|
// sync mode should update db immediately
|
||||||
|
// pull_time
|
||||||
|
art, err := artifact.Mgr.Get(suite.ctx, 1)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.False(art.PullTime.IsZero(), "sync update pull_time")
|
||||||
|
lastPullTime := art.PullTime
|
||||||
|
// pull_count
|
||||||
|
repository, err := repo.Mgr.Get(suite.ctx, 1)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(int64(1), repository.PullCount, "sync update pull_count")
|
||||||
|
|
||||||
|
// test async mode
|
||||||
|
asyncFlushDuration = 200 * time.Millisecond
|
||||||
|
err = suite.handler.onPull(suite.ctx, &event.ArtifactEvent{Artifact: &artifact.Artifact{ID: 1, RepositoryID: 1}, Tags: []string{"latest"}})
|
||||||
|
suite.Nil(err, "onPull should return nil")
|
||||||
|
// async mode should not update db immediately
|
||||||
|
// pull_time
|
||||||
|
art, err = artifact.Mgr.Get(suite.ctx, 1)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(lastPullTime, art.PullTime, "pull_time should not be updated immediately")
|
||||||
|
// pull_count
|
||||||
|
repository, err = repo.Mgr.Get(suite.ctx, 1)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(int64(1), repository.PullCount, "pull_count should not be updated immediately")
|
||||||
|
// wait for db update
|
||||||
|
suite.Eventually(func() bool {
|
||||||
|
art, err = artifact.Mgr.Get(suite.ctx, 1)
|
||||||
|
suite.Nil(err)
|
||||||
|
return art.PullTime.After(lastPullTime)
|
||||||
|
}, 3*asyncFlushDuration, asyncFlushDuration/2, "wait for pull_time async update")
|
||||||
|
|
||||||
|
suite.Eventually(func() bool {
|
||||||
|
repository, err = repo.Mgr.Get(suite.ctx, 1)
|
||||||
|
suite.Nil(err)
|
||||||
|
return int64(2) == repository.PullCount
|
||||||
|
}, 3*asyncFlushDuration, asyncFlushDuration/2, "wait for pull_count async update")
|
||||||
|
}
|
||||||
|
@ -52,8 +52,8 @@ type Controller interface {
|
|||||||
Delete(ctx context.Context, id int64) (err error)
|
Delete(ctx context.Context, id int64) (err error)
|
||||||
// Update the repository. Specify the properties or all properties will be updated
|
// Update the repository. Specify the properties or all properties will be updated
|
||||||
Update(ctx context.Context, repository *model.RepoRecord, properties ...string) (err error)
|
Update(ctx context.Context, repository *model.RepoRecord, properties ...string) (err error)
|
||||||
// AddPullCount increase one pull count for the specified repository
|
// AddPullCount increase pull count for the specified repository
|
||||||
AddPullCount(ctx context.Context, id int64) error
|
AddPullCount(ctx context.Context, id int64, count uint64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewController creates an instance of the default repository controller
|
// NewController creates an instance of the default repository controller
|
||||||
@ -182,6 +182,6 @@ func (c *controller) Update(ctx context.Context, repository *model.RepoRecord, p
|
|||||||
return c.repoMgr.Update(ctx, repository, properties...)
|
return c.repoMgr.Update(ctx, repository, properties...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) AddPullCount(ctx context.Context, id int64) error {
|
func (c *controller) AddPullCount(ctx context.Context, id int64, count uint64) error {
|
||||||
return c.repoMgr.AddPullCount(ctx, id)
|
return c.repoMgr.AddPullCount(ctx, id, count)
|
||||||
}
|
}
|
||||||
|
@ -143,8 +143,8 @@ func (c *controllerTestSuite) TestUpdate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *controllerTestSuite) TestAddPullCount() {
|
func (c *controllerTestSuite) TestAddPullCount() {
|
||||||
c.repoMgr.On("AddPullCount", mock.Anything, mock.Anything).Return(nil)
|
c.repoMgr.On("AddPullCount", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
err := c.ctl.AddPullCount(nil, 1)
|
err := c.ctl.AddPullCount(nil, 1, 1)
|
||||||
c.Require().Nil(err)
|
c.Require().Nil(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
beegoorm "github.com/astaxie/beego/orm"
|
beegoorm "github.com/astaxie/beego/orm"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
@ -42,6 +43,8 @@ type DAO interface {
|
|||||||
Delete(ctx context.Context, id int64) (err error)
|
Delete(ctx context.Context, id int64) (err error)
|
||||||
// Update updates the artifact. Only the properties specified by "props" will be updated if it is set
|
// Update updates the artifact. Only the properties specified by "props" will be updated if it is set
|
||||||
Update(ctx context.Context, artifact *Artifact, props ...string) (err error)
|
Update(ctx context.Context, artifact *Artifact, props ...string) (err error)
|
||||||
|
// UpdatePullTime updates artifact pull time by ID.
|
||||||
|
UpdatePullTime(ctx context.Context, id int64, pullTime time.Time) (err error)
|
||||||
// CreateReference creates the artifact reference
|
// CreateReference creates the artifact reference
|
||||||
CreateReference(ctx context.Context, reference *ArtifactReference) (id int64, err error)
|
CreateReference(ctx context.Context, reference *ArtifactReference) (id int64, err error)
|
||||||
// ListReferences lists the artifact references according to the query
|
// ListReferences lists the artifact references according to the query
|
||||||
@ -180,6 +183,7 @@ func (d *dao) Update(ctx context.Context, artifact *Artifact, props ...string) e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := ormer.Update(artifact, props...)
|
n, err := ormer.Update(artifact, props...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -189,6 +193,27 @@ func (d *dao) Update(ctx context.Context, artifact *Artifact, props ...string) e
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *dao) UpdatePullTime(ctx context.Context, id int64, pullTime time.Time) error {
|
||||||
|
ormer, err := orm.FromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// can only be retained to the second if not format
|
||||||
|
formatPullTime := pullTime.Format("2006-01-02 15:04:05.999999")
|
||||||
|
// update db only if pull_time is null or pull_time < (in-coming)pullTime
|
||||||
|
sql := "UPDATE artifact SET pull_time = ? WHERE id = ? AND (pull_time IS NULL OR pull_time < ?)"
|
||||||
|
args := []interface{}{formatPullTime, id, formatPullTime}
|
||||||
|
|
||||||
|
_, err = ormer.Raw(sql, args...).Exec()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (d *dao) CreateReference(ctx context.Context, reference *ArtifactReference) (int64, error) {
|
func (d *dao) CreateReference(ctx context.Context, reference *ArtifactReference) (int64, error) {
|
||||||
ormer, err := orm.FromContext(ctx)
|
ormer, err := orm.FromContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -16,6 +16,9 @@ package dao
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
beegoorm "github.com/astaxie/beego/orm"
|
beegoorm "github.com/astaxie/beego/orm"
|
||||||
common_dao "github.com/goharbor/harbor/src/common/dao"
|
common_dao "github.com/goharbor/harbor/src/common/dao"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
@ -25,8 +28,6 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/pkg/tag/model/tag"
|
"github.com/goharbor/harbor/src/pkg/tag/model/tag"
|
||||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type daoTestSuite struct {
|
type daoTestSuite struct {
|
||||||
@ -396,6 +397,32 @@ func (d *daoTestSuite) TestUpdate() {
|
|||||||
d.True(errors.IsErr(err, errors.NotFoundCode))
|
d.True(errors.IsErr(err, errors.NotFoundCode))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *daoTestSuite) TestUpdatePullTime() {
|
||||||
|
artifact, err := d.dao.Get(d.ctx, d.parentArtID)
|
||||||
|
d.Require().Nil(err)
|
||||||
|
d.Require().NotNil(artifact)
|
||||||
|
oldPullTime := artifact.PullTime
|
||||||
|
// case for pull_time before db pull_time.
|
||||||
|
before := oldPullTime.AddDate(0, 0, -1)
|
||||||
|
err = d.dao.UpdatePullTime(d.ctx, d.parentArtID, before)
|
||||||
|
d.Require().Nil(err)
|
||||||
|
|
||||||
|
artifact, err = d.dao.Get(d.ctx, d.parentArtID)
|
||||||
|
d.Require().Nil(err)
|
||||||
|
d.Require().NotNil(artifact)
|
||||||
|
d.Equal(oldPullTime.Unix(), artifact.PullTime.Unix())
|
||||||
|
|
||||||
|
// case for pull_time after db pull_time.
|
||||||
|
after := oldPullTime.AddDate(0, 0, 1)
|
||||||
|
err = d.dao.UpdatePullTime(d.ctx, d.parentArtID, after)
|
||||||
|
d.Require().Nil(err)
|
||||||
|
|
||||||
|
artifact, err = d.dao.Get(d.ctx, d.parentArtID)
|
||||||
|
d.Require().Nil(err)
|
||||||
|
d.Require().NotNil(artifact)
|
||||||
|
d.Equal(after.Unix(), artifact.PullTime.Unix())
|
||||||
|
}
|
||||||
|
|
||||||
func (d *daoTestSuite) TestCreateReference() {
|
func (d *daoTestSuite) TestCreateReference() {
|
||||||
// happy pass is covered in SetupTest
|
// happy pass is covered in SetupTest
|
||||||
|
|
||||||
|
@ -16,6 +16,8 @@ package artifact
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/lib/q"
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
"github.com/goharbor/harbor/src/pkg/artifact/dao"
|
"github.com/goharbor/harbor/src/pkg/artifact/dao"
|
||||||
)
|
)
|
||||||
@ -45,6 +47,8 @@ type Manager interface {
|
|||||||
Delete(ctx context.Context, id int64) (err error)
|
Delete(ctx context.Context, id int64) (err error)
|
||||||
// Update the artifact. Only the properties specified by "props" will be updated if it is set
|
// Update the artifact. Only the properties specified by "props" will be updated if it is set
|
||||||
Update(ctx context.Context, artifact *Artifact, props ...string) (err error)
|
Update(ctx context.Context, artifact *Artifact, props ...string) (err error)
|
||||||
|
// UpdatePullTime updates artifact pull time by ID.
|
||||||
|
UpdatePullTime(ctx context.Context, id int64, pullTime time.Time) (err error)
|
||||||
// ListReferences according to the query
|
// ListReferences according to the query
|
||||||
ListReferences(ctx context.Context, query *q.Query) (references []*Reference, err error)
|
ListReferences(ctx context.Context, query *q.Query) (references []*Reference, err error)
|
||||||
// DeleteReference specified by ID
|
// DeleteReference specified by ID
|
||||||
@ -126,6 +130,10 @@ func (m *manager) Update(ctx context.Context, artifact *Artifact, props ...strin
|
|||||||
return m.dao.Update(ctx, artifact.To(), props...)
|
return m.dao.Update(ctx, artifact.To(), props...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *manager) UpdatePullTime(ctx context.Context, id int64, pullTime time.Time) (err error) {
|
||||||
|
return m.dao.UpdatePullTime(ctx, id, pullTime)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *manager) ListReferences(ctx context.Context, query *q.Query) ([]*Reference, error) {
|
func (m *manager) ListReferences(ctx context.Context, query *q.Query) ([]*Reference, error) {
|
||||||
references, err := m.dao.ListReferences(ctx, query)
|
references, err := m.dao.ListReferences(ctx, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -58,6 +58,10 @@ func (f *fakeDao) Update(ctx context.Context, artifact *dao.Artifact, props ...s
|
|||||||
args := f.Called()
|
args := f.Called()
|
||||||
return args.Error(0)
|
return args.Error(0)
|
||||||
}
|
}
|
||||||
|
func (f *fakeDao) UpdatePullTime(ctx context.Context, id int64, pullTime time.Time) error {
|
||||||
|
args := f.Called()
|
||||||
|
return args.Error(0)
|
||||||
|
}
|
||||||
func (f *fakeDao) CreateReference(ctx context.Context, reference *dao.ArtifactReference) (int64, error) {
|
func (f *fakeDao) CreateReference(ctx context.Context, reference *dao.ArtifactReference) (int64, error) {
|
||||||
args := f.Called()
|
args := f.Called()
|
||||||
return int64(args.Int(0)), args.Error(1)
|
return int64(args.Int(0)), args.Error(1)
|
||||||
@ -233,6 +237,13 @@ func (m *managerTestSuite) TestUpdate() {
|
|||||||
m.dao.AssertExpectations(m.T())
|
m.dao.AssertExpectations(m.T())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *managerTestSuite) TestUpdatePullTime() {
|
||||||
|
m.dao.On("UpdatePullTime", mock.Anything).Return(nil)
|
||||||
|
err := m.mgr.UpdatePullTime(nil, 1, time.Now())
|
||||||
|
m.Require().Nil(err)
|
||||||
|
m.dao.AssertExpectations(m.T())
|
||||||
|
}
|
||||||
|
|
||||||
func (m *managerTestSuite) TestListReferences() {
|
func (m *managerTestSuite) TestListReferences() {
|
||||||
m.dao.On("ListReferences").Return([]*dao.ArtifactReference{
|
m.dao.On("ListReferences").Return([]*dao.ArtifactReference{
|
||||||
{
|
{
|
||||||
|
@ -39,8 +39,8 @@ type DAO interface {
|
|||||||
Delete(ctx context.Context, id int64) (err error)
|
Delete(ctx context.Context, id int64) (err error)
|
||||||
// Update updates the repository. Only the properties specified by "props" will be updated if it is set
|
// Update updates the repository. Only the properties specified by "props" will be updated if it is set
|
||||||
Update(ctx context.Context, repository *model.RepoRecord, props ...string) (err error)
|
Update(ctx context.Context, repository *model.RepoRecord, props ...string) (err error)
|
||||||
// AddPullCount increase one pull count for the specified repository
|
// AddPullCount increase pull count for the specified repository
|
||||||
AddPullCount(ctx context.Context, id int64) error
|
AddPullCount(ctx context.Context, id int64, count uint64) error
|
||||||
// NonEmptyRepos returns the repositories without any artifact or all the artifacts are untagged.
|
// NonEmptyRepos returns the repositories without any artifact or all the artifacts are untagged.
|
||||||
NonEmptyRepos(ctx context.Context) ([]*model.RepoRecord, error)
|
NonEmptyRepos(ctx context.Context) ([]*model.RepoRecord, error)
|
||||||
}
|
}
|
||||||
@ -132,14 +132,14 @@ func (d *dao) Update(ctx context.Context, repository *model.RepoRecord, props ..
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dao) AddPullCount(ctx context.Context, id int64) error {
|
func (d *dao) AddPullCount(ctx context.Context, id int64, count uint64) error {
|
||||||
ormer, err := orm.FromContext(ctx)
|
ormer, err := orm.FromContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
num, err := ormer.QueryTable(new(model.RepoRecord)).Filter("RepositoryID", id).Update(
|
num, err := ormer.QueryTable(new(model.RepoRecord)).Filter("RepositoryID", id).Update(
|
||||||
o.Params{
|
o.Params{
|
||||||
"pull_count": o.ColValue(o.ColAdd, 1),
|
"pull_count": o.ColValue(o.ColAdd, count),
|
||||||
"update_time": time.Now(),
|
"update_time": time.Now(),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -17,6 +17,9 @@ package dao
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
beegoorm "github.com/astaxie/beego/orm"
|
beegoorm "github.com/astaxie/beego/orm"
|
||||||
common_dao "github.com/goharbor/harbor/src/common/dao"
|
common_dao "github.com/goharbor/harbor/src/common/dao"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
@ -28,8 +31,6 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/pkg/tag/model/tag"
|
"github.com/goharbor/harbor/src/pkg/tag/model/tag"
|
||||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -179,7 +180,7 @@ func (d *daoTestSuite) TestAddPullCount() {
|
|||||||
id, err := d.dao.Create(d.ctx, repository)
|
id, err := d.dao.Create(d.ctx, repository)
|
||||||
d.Require().Nil(err)
|
d.Require().Nil(err)
|
||||||
|
|
||||||
err = d.dao.AddPullCount(d.ctx, id)
|
err = d.dao.AddPullCount(d.ctx, id, 1)
|
||||||
d.Require().Nil(err)
|
d.Require().Nil(err)
|
||||||
|
|
||||||
repository, err = d.dao.Get(d.ctx, id)
|
repository, err = d.dao.Get(d.ctx, id)
|
||||||
|
@ -42,8 +42,8 @@ type Manager interface {
|
|||||||
Delete(ctx context.Context, id int64) (err error)
|
Delete(ctx context.Context, id int64) (err error)
|
||||||
// Update updates the repository. Only the properties specified by "props" will be updated if it is set
|
// Update updates the repository. Only the properties specified by "props" will be updated if it is set
|
||||||
Update(ctx context.Context, repository *model.RepoRecord, props ...string) (err error)
|
Update(ctx context.Context, repository *model.RepoRecord, props ...string) (err error)
|
||||||
// AddPullCount increase one pull count for the specified repository
|
// AddPullCount increase pull count for the specified repository
|
||||||
AddPullCount(ctx context.Context, id int64) error
|
AddPullCount(ctx context.Context, id int64, count uint64) error
|
||||||
// NonEmptyRepos returns the repositories without any artifact or all the artifacts are untagged.
|
// NonEmptyRepos returns the repositories without any artifact or all the artifacts are untagged.
|
||||||
NonEmptyRepos(ctx context.Context) ([]*model.RepoRecord, error)
|
NonEmptyRepos(ctx context.Context) ([]*model.RepoRecord, error)
|
||||||
}
|
}
|
||||||
@ -102,8 +102,8 @@ func (m *manager) Update(ctx context.Context, repository *model.RepoRecord, prop
|
|||||||
return m.dao.Update(ctx, repository, props...)
|
return m.dao.Update(ctx, repository, props...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) AddPullCount(ctx context.Context, id int64) error {
|
func (m *manager) AddPullCount(ctx context.Context, id int64, count uint64) error {
|
||||||
return m.dao.AddPullCount(ctx, id)
|
return m.dao.AddPullCount(ctx, id, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) NonEmptyRepos(ctx context.Context) ([]*model.RepoRecord, error) {
|
func (m *manager) NonEmptyRepos(ctx context.Context) ([]*model.RepoRecord, error) {
|
||||||
|
@ -108,8 +108,8 @@ func (m *managerTestSuite) TestUpdate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *managerTestSuite) TestAddPullCount() {
|
func (m *managerTestSuite) TestAddPullCount() {
|
||||||
m.dao.On("AddPullCount", mock.Anything, mock.Anything).Return(nil)
|
m.dao.On("AddPullCount", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
err := m.mgr.AddPullCount(context.Background(), 1)
|
err := m.mgr.AddPullCount(context.Background(), 1, 1)
|
||||||
m.Require().Nil(err)
|
m.Require().Nil(err)
|
||||||
m.dao.AssertExpectations(m.T())
|
m.dao.AssertExpectations(m.T())
|
||||||
}
|
}
|
||||||
|
@ -16,13 +16,13 @@ type Controller struct {
|
|||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddPullCount provides a mock function with given fields: ctx, id
|
// AddPullCount provides a mock function with given fields: ctx, id, count
|
||||||
func (_m *Controller) AddPullCount(ctx context.Context, id int64) error {
|
func (_m *Controller) AddPullCount(ctx context.Context, id int64, count uint64) error {
|
||||||
ret := _m.Called(ctx, id)
|
ret := _m.Called(ctx, id, count)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok {
|
if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) error); ok {
|
||||||
r0 = rf(ctx, id)
|
r0 = rf(ctx, id, count)
|
||||||
} else {
|
} else {
|
||||||
r0 = ret.Error(0)
|
r0 = ret.Error(0)
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,8 @@ import (
|
|||||||
mock "github.com/stretchr/testify/mock"
|
mock "github.com/stretchr/testify/mock"
|
||||||
|
|
||||||
q "github.com/goharbor/harbor/src/lib/q"
|
q "github.com/goharbor/harbor/src/lib/q"
|
||||||
|
|
||||||
|
time "time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Manager is an autogenerated mock type for the Manager type
|
// Manager is an autogenerated mock type for the Manager type
|
||||||
@ -199,3 +201,17 @@ func (_m *Manager) Update(ctx context.Context, _a1 *artifact.Artifact, props ...
|
|||||||
|
|
||||||
return r0
|
return r0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdatePullTime provides a mock function with given fields: ctx, id, pullTime
|
||||||
|
func (_m *Manager) UpdatePullTime(ctx context.Context, id int64, pullTime time.Time) error {
|
||||||
|
ret := _m.Called(ctx, id, pullTime)
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, int64, time.Time) error); ok {
|
||||||
|
r0 = rf(ctx, id, pullTime)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
@ -17,13 +17,13 @@ type DAO struct {
|
|||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddPullCount provides a mock function with given fields: ctx, id
|
// AddPullCount provides a mock function with given fields: ctx, id, count
|
||||||
func (_m *DAO) AddPullCount(ctx context.Context, id int64) error {
|
func (_m *DAO) AddPullCount(ctx context.Context, id int64, count uint64) error {
|
||||||
ret := _m.Called(ctx, id)
|
ret := _m.Called(ctx, id, count)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok {
|
if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) error); ok {
|
||||||
r0 = rf(ctx, id)
|
r0 = rf(ctx, id, count)
|
||||||
} else {
|
} else {
|
||||||
r0 = ret.Error(0)
|
r0 = ret.Error(0)
|
||||||
}
|
}
|
||||||
|
@ -16,13 +16,13 @@ type Manager struct {
|
|||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddPullCount provides a mock function with given fields: ctx, id
|
// AddPullCount provides a mock function with given fields: ctx, id, count
|
||||||
func (_m *Manager) AddPullCount(ctx context.Context, id int64) error {
|
func (_m *Manager) AddPullCount(ctx context.Context, id int64, count uint64) error {
|
||||||
ret := _m.Called(ctx, id)
|
ret := _m.Called(ctx, id, count)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok {
|
if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) error); ok {
|
||||||
r0 = rf(ctx, id)
|
r0 = rf(ctx, id, count)
|
||||||
} else {
|
} else {
|
||||||
r0 = ret.Error(0)
|
r0 = ret.Error(0)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user