fix blob deleting status issue (#12481)

1, The update blob status method should udpate the blob version of the blob object as well, otherwise the GC job cannot handle the blob status transform(none - delete - deleting - deletefailed)
as the method is using version equals as the query condition.
2, For the deleting blob which marked for more than 2 hours, it should be set to delete failed in head blob & put manifest request

Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
Wang Yan 2020-07-20 11:44:29 +08:00 committed by GitHub
parent 5a898c1661
commit 24ed52112e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 149 additions and 52 deletions

View File

@ -79,6 +79,9 @@ type Controller interface {
// Touch updates the blob status to StatusNone and increase version every time. // Touch updates the blob status to StatusNone and increase version every time.
Touch(ctx context.Context, blob *blob.Blob) error Touch(ctx context.Context, blob *blob.Blob) error
// Fail updates the blob status to StatusDeleteFailed and increase version every time.
Fail(ctx context.Context, blob *blob.Blob) error
// Update updates the blob, it cannot handle blob status transitions. // Update updates the blob, it cannot handle blob status transitions.
Update(ctx context.Context, blob *blob.Blob) error Update(ctx context.Context, blob *blob.Blob) error
@ -336,6 +339,18 @@ func (c *controller) Touch(ctx context.Context, blob *blob.Blob) error {
return nil return nil
} }
func (c *controller) Fail(ctx context.Context, blob *blob.Blob) error {
blob.Status = blob_models.StatusDeleteFailed
count, err := c.blobMgr.UpdateBlobStatus(ctx, blob)
if err != nil {
return err
}
if count == 0 {
return errors.New(nil).WithMessage(fmt.Sprintf("no blob item is updated to StatusDeleteFailed, id:%d, digest:%s", blob.ID, blob.Digest)).WithCode(errors.NotFoundCode)
}
return nil
}
func (c *controller) Update(ctx context.Context, blob *blob.Blob) error { func (c *controller) Update(ctx context.Context, blob *blob.Blob) error {
return c.blobMgr.Update(ctx, blob) return c.blobMgr.Update(ctx, blob)
} }

View File

@ -292,6 +292,40 @@ func (suite *ControllerTestSuite) TestTouch() {
suite.Equal(blob.Status, models.StatusNone) suite.Equal(blob.Status, models.StatusNone)
} }
func (suite *ControllerTestSuite) TestFail() {
ctx := suite.Context()
err := Ctl.Fail(ctx, &blob.Blob{
Status: models.StatusNone,
})
suite.NotNil(err)
suite.True(errors.IsNotFoundErr(err))
digest := suite.prepareBlob()
blob, err := Ctl.Get(ctx, digest)
suite.Nil(err)
blob.Status = models.StatusDelete
_, err = pkg_blob.Mgr.UpdateBlobStatus(suite.Context(), blob)
suite.Nil(err)
// StatusDelete cannot be marked as StatusDeleteFailed
err = Ctl.Fail(ctx, blob)
suite.NotNil(err)
suite.True(errors.IsNotFoundErr(err))
blob.Status = models.StatusDeleting
_, err = pkg_blob.Mgr.UpdateBlobStatus(suite.Context(), blob)
suite.Nil(err)
err = Ctl.Fail(ctx, blob)
suite.Nil(err)
blobAfter, err := Ctl.Get(ctx, digest)
suite.Nil(err)
suite.Equal(models.StatusDeleteFailed, blobAfter.Status)
}
func (suite *ControllerTestSuite) TestDelete() { func (suite *ControllerTestSuite) TestDelete() {
ctx := suite.Context() ctx := suite.Context()

View File

@ -22,7 +22,6 @@ import (
"strings" "strings"
"time" "time"
beego_orm "github.com/astaxie/beego/orm"
"github.com/docker/distribution/manifest/schema2" "github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/lib/q"
@ -180,41 +179,29 @@ func (d *dao) UpdateBlobStatus(ctx context.Context, blob *models.Blob) (int64, e
return -1, err return -1, err
} }
// each update will auto increase version and update time var sql string
data := make(beego_orm.Params)
data["version"] = beego_orm.ColValue(beego_orm.ColAdd, 1)
data["update_time"] = time.Now()
data["status"] = blob.Status
qt := o.QueryTable(&models.Blob{})
cond := beego_orm.NewCondition()
var c *beego_orm.Condition
// In the multiple blob head scenario, if one request success mark the blob from StatusDelete to StatusNone, then version should increase one.
// in the meantime, the other requests tries to do the same thing, use 'where version >= blob.version' can handle it.
if blob.Status == models.StatusNone { if blob.Status == models.StatusNone {
c = cond.And("version__gte", blob.Version) sql = `UPDATE blob SET version = version + 1, update_time = ?, status = ? where id = ? AND version >= ? AND status IN (%s) RETURNING version as new_vesrion`
} else { } else {
c = cond.And("version", blob.Version) sql = `UPDATE blob SET version = version + 1, update_time = ?, status = ? where id = ? AND version = ? AND status IN (%s) RETURNING version as new_vesrion`
} }
/* var newVersion int64
generated simple sql string. params := []interface{}{time.Now(), blob.Status, blob.ID, blob.Version}
UPDATE "blob" SET "version" = "version" + $1, "update_time" = $2, "status" = $3 stats := models.StatusMap[blob.Status]
WHERE "id" IN ( SELECT T0."id" FROM "blob" T0 WHERE T0."version" >= $4 AND T0."id" = $5 AND T0."status" IN ('delete', 'deleting') ) for _, stat := range stats {
*/ params = append(params, stat)
count, err := qt.SetCond(c).Filter("id", blob.ID).
Filter("status__in", models.StatusMap[blob.Status]).
Update(data)
if err != nil {
return count, err
} }
if count == 0 { if err := o.Raw(fmt.Sprintf(sql, orm.ParamPlaceholderForIn(len(models.StatusMap[blob.Status]))), params...).QueryRow(&newVersion); err != nil {
log.Warningf("no blob is updated according to query condition, id: %d, status_in, %v", blob.ID, models.StatusMap[blob.Status]) if e := orm.AsNotFoundError(err, "no blob is updated"); e != nil {
log.Warningf("no blob is updated according to query condition, id: %d, status_in, %v, err: %v", blob.ID, models.StatusMap[blob.Status], e)
return 0, nil return 0, nil
} }
return count, nil return -1, err
}
blob.Version = newVersion
return 1, nil
} }
// UpdateBlob cannot handle the status change and version increase, for handling blob status change, please call // UpdateBlob cannot handle the status change and version increase, for handling blob status change, please call

View File

@ -169,20 +169,26 @@ func (suite *DaoTestSuite) TestUpdateBlobStatus() {
count, err := suite.dao.UpdateBlobStatus(ctx, blob) count, err := suite.dao.UpdateBlobStatus(ctx, blob)
suite.Nil(err) suite.Nil(err)
suite.Equal(int64(0), count) suite.Equal(int64(0), count)
blob, err = suite.dao.GetBlobByDigest(ctx, digest)
if suite.Nil(err) {
suite.Equal(int64(0), blob.Version)
suite.Equal(models.StatusNone, blob.Status)
}
blob.Status = models.StatusDelete blob.Status = models.StatusDelete
count, err = suite.dao.UpdateBlobStatus(ctx, blob) count, err = suite.dao.UpdateBlobStatus(ctx, blob)
suite.Nil(err) suite.Nil(err)
suite.Equal(int64(1), count) suite.Equal(int64(1), count)
blob.Status = models.StatusDeleting
count, err = suite.dao.UpdateBlobStatus(ctx, blob)
suite.Nil(err)
suite.Equal(int64(1), count)
blob.Status = models.StatusDeleteFailed
count, err = suite.dao.UpdateBlobStatus(ctx, blob)
suite.Nil(err)
suite.Equal(int64(1), count)
blob, err = suite.dao.GetBlobByDigest(ctx, digest) blob, err = suite.dao.GetBlobByDigest(ctx, digest)
if suite.Nil(err) { if suite.Nil(err) {
suite.Equal(int64(1), blob.Version) suite.Equal(int64(3), blob.Version)
suite.Equal(models.StatusDelete, blob.Status) suite.Equal(models.StatusDeleteFailed, blob.Status)
} }
} }

View File

@ -16,6 +16,7 @@ package blob
import ( import (
"context" "context"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/blob/dao" "github.com/goharbor/harbor/src/pkg/blob/dao"
"github.com/goharbor/harbor/src/pkg/blob/models" "github.com/goharbor/harbor/src/pkg/blob/models"
) )
@ -121,6 +122,10 @@ func (m *manager) Update(ctx context.Context, blob *Blob) error {
} }
func (m *manager) UpdateBlobStatus(ctx context.Context, blob *models.Blob) (int64, error) { func (m *manager) UpdateBlobStatus(ctx context.Context, blob *models.Blob) (int64, error) {
_, exist := models.StatusMap[blob.Status]
if !exist {
return -1, errors.New(nil).WithMessage("cannot update blob status, as the status is unknown. digest: %s, status: %s", blob.Digest, blob.Status)
}
return m.dao.UpdateBlobStatus(ctx, blob) return m.dao.UpdateBlobStatus(ctx, blob)
} }

View File

@ -283,9 +283,22 @@ func (suite *ManagerTestSuite) TestUpdateStatus() {
blob, err := Mgr.Get(ctx, digest) blob, err := Mgr.Get(ctx, digest)
if suite.Nil(err) { if suite.Nil(err) {
blob.Status = models.StatusDelete
_, err := Mgr.UpdateBlobStatus(ctx, blob) blob.Status = "unknown"
count, err := Mgr.UpdateBlobStatus(ctx, blob)
suite.NotNil(err)
suite.Equal(int64(-1), count)
// StatusNone cannot be updated to StatusDeleting
blob.Status = models.StatusDeleting
count, err = Mgr.UpdateBlobStatus(ctx, blob)
suite.Nil(err) suite.Nil(err)
suite.Equal(int64(0), count)
blob.Status = models.StatusDelete
count, err = Mgr.UpdateBlobStatus(ctx, blob)
suite.Nil(err)
suite.Equal(int64(1), count)
{ {
blob, err := Mgr.Get(ctx, digest) blob, err := Mgr.Get(ctx, digest)

View File

@ -11,6 +11,7 @@ import (
"github.com/goharbor/harbor/src/server/middleware" "github.com/goharbor/harbor/src/server/middleware"
"github.com/goharbor/harbor/src/server/middleware/requestid" "github.com/goharbor/harbor/src/server/middleware/requestid"
"net/http" "net/http"
"time"
) )
// HeadBlobMiddleware intercept the head blob request // HeadBlobMiddleware intercept the head blob request
@ -40,12 +41,21 @@ func handleHead(req *http.Request) error {
switch bb.Status { switch bb.Status {
case blob_models.StatusNone, blob_models.StatusDelete: case blob_models.StatusNone, blob_models.StatusDelete:
err := blob.Ctl.Touch(req.Context(), bb) if err := blob.Ctl.Touch(req.Context(), bb); err != nil {
if err != nil {
log.Errorf("failed to update blob: %s status to StatusNone, error:%v", blobInfo.Digest, err) log.Errorf("failed to update blob: %s status to StatusNone, error:%v", blobInfo.Digest, err)
return errors.Wrapf(err, fmt.Sprintf("the request id is: %s", req.Header.Get(requestid.HeaderXRequestID))) return errors.Wrapf(err, fmt.Sprintf("the request id is: %s", req.Header.Get(requestid.HeaderXRequestID)))
} }
case blob_models.StatusDeleting, blob_models.StatusDeleteFailed: case blob_models.StatusDeleting:
now := time.Now().UTC()
// if the deleting exceed 2 hours, marks the blob as StatusDeleteFailed and gives a 404, so client can push it again
if now.Sub(bb.UpdateTime) > time.Duration(BlobDeleteingTimeWindow)*time.Hour {
if err := blob.Ctl.Fail(req.Context(), bb); err != nil {
log.Errorf("failed to update blob: %s status to StatusDeleteFailed, error:%v", blobInfo.Digest, err)
return errors.Wrapf(err, fmt.Sprintf("the request id is: %s", req.Header.Get(requestid.HeaderXRequestID)))
}
}
return errors.New(nil).WithMessage(fmt.Sprintf("the asking blob is delete failed, mark it as non existing, request id: %s", req.Header.Get(requestid.HeaderXRequestID))).WithCode(errors.NotFoundCode)
case blob_models.StatusDeleteFailed:
return errors.New(nil).WithMessage(fmt.Sprintf("the asking blob is in GC, mark it as non existing, request id: %s", req.Header.Get(requestid.HeaderXRequestID))).WithCode(errors.NotFoundCode) return errors.New(nil).WithMessage(fmt.Sprintf("the asking blob is in GC, mark it as non existing, request id: %s", req.Header.Get(requestid.HeaderXRequestID))).WithCode(errors.NotFoundCode)
default: default:
return errors.New(nil).WithMessage(fmt.Sprintf("wrong blob status, %s", bb.Status)) return errors.New(nil).WithMessage(fmt.Sprintf("wrong blob status, %s", bb.Status))

View File

@ -2,13 +2,18 @@ package blob
import ( import (
"fmt" "fmt"
"github.com/goharbor/harbor/src/controller/blob"
"github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/blob/models" "github.com/goharbor/harbor/src/pkg/blob/models"
"github.com/goharbor/harbor/src/server/middleware/requestid" "github.com/goharbor/harbor/src/server/middleware/requestid"
"net/http" "net/http"
"time"
) )
// BlobDeleteingTimeWindow is the time window used in GC to reserve blobs
const BlobDeleteingTimeWindow = 2
// probeBlob handles config/layer and manifest status in the PUT Blob & Manifest middleware, and update the status before it passed into proxy(distribution). // probeBlob handles config/layer and manifest status in the PUT Blob & Manifest middleware, and update the status before it passed into proxy(distribution).
func probeBlob(r *http.Request, digest string) error { func probeBlob(r *http.Request, digest string) error {
logger := log.G(r.Context()) logger := log.G(r.Context())
@ -24,14 +29,22 @@ func probeBlob(r *http.Request, digest string) error {
switch bb.Status { switch bb.Status {
case models.StatusNone, models.StatusDelete, models.StatusDeleteFailed: case models.StatusNone, models.StatusDelete, models.StatusDeleteFailed:
err := blobController.Touch(r.Context(), bb) if err := blobController.Touch(r.Context(), bb); err != nil {
if err != nil {
logger.Errorf("failed to update blob: %s status to StatusNone, error:%v", bb.Digest, err) logger.Errorf("failed to update blob: %s status to StatusNone, error:%v", bb.Digest, err)
return errors.Wrapf(err, fmt.Sprintf("the request id is: %s", r.Header.Get(requestid.HeaderXRequestID))) return errors.Wrapf(err, fmt.Sprintf("the request id is: %s", r.Header.Get(requestid.HeaderXRequestID)))
} }
case models.StatusDeleting: case models.StatusDeleting:
logger.Warningf(fmt.Sprintf("the asking blob is in GC, mark it as non existing, request id: %s", r.Header.Get(requestid.HeaderXRequestID))) now := time.Now().UTC()
return errors.New(nil).WithMessage(fmt.Sprintf("the asking blob is in GC, mark it as non existing, request id: %s", r.Header.Get(requestid.HeaderXRequestID))).WithCode(errors.NotFoundCode) // if the deleting exceed 2 hours, marks the blob as StatusDeleteFailed
if now.Sub(bb.UpdateTime) > time.Duration(BlobDeleteingTimeWindow)*time.Hour {
if err := blob.Ctl.Fail(r.Context(), bb); err != nil {
log.Errorf("failed to update blob: %s status to StatusDeleteFailed, error:%v", bb.Digest, err)
return errors.Wrapf(err, fmt.Sprintf("the request id is: %s", r.Header.Get(requestid.HeaderXRequestID)))
}
// StatusDeleteFailed => StatusNone, and then let the proxy to handle manifest upload
return probeBlob(r, digest)
}
return errors.New(nil).WithMessage(fmt.Sprintf("the asking blob is delete failed, mark it as non existing, request id: %s", r.Header.Get(requestid.HeaderXRequestID))).WithCode(errors.NotFoundCode)
default: default:
return nil return nil
} }

View File

@ -145,6 +145,20 @@ func (_m *Controller) Exist(ctx context.Context, digest string, options ...blob.
return r0, r1 return r0, r1
} }
// Fail provides a mock function with given fields: ctx, _a1
func (_m *Controller) Fail(ctx context.Context, _a1 *models.Blob) error {
ret := _m.Called(ctx, _a1)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *models.Blob) error); ok {
r0 = rf(ctx, _a1)
} else {
r0 = ret.Error(0)
}
return r0
}
// FindMissingAssociationsForProject provides a mock function with given fields: ctx, projectID, blobs // FindMissingAssociationsForProject provides a mock function with given fields: ctx, projectID, blobs
func (_m *Controller) FindMissingAssociationsForProject(ctx context.Context, projectID int64, blobs []*models.Blob) ([]*models.Blob, error) { func (_m *Controller) FindMissingAssociationsForProject(ctx context.Context, projectID int64, blobs []*models.Blob) ([]*models.Blob, error) {
ret := _m.Called(ctx, projectID, blobs) ret := _m.Called(ctx, projectID, blobs)

View File

@ -221,13 +221,13 @@ func (_m *Manager) UpdateBlobStatus(ctx context.Context, _a1 *models.Blob) (int6
return r0, r1 return r0, r1
} }
// UselessBlobs provides a mock function with given fields: ctx, timeWindow // UselessBlobs provides a mock function with given fields: ctx, timeWindowHours
func (_m *Manager) UselessBlobs(ctx context.Context, timeWindow int64) ([]*models.Blob, error) { func (_m *Manager) UselessBlobs(ctx context.Context, timeWindowHours int64) ([]*models.Blob, error) {
ret := _m.Called(ctx, timeWindow) ret := _m.Called(ctx, timeWindowHours)
var r0 []*models.Blob var r0 []*models.Blob
if rf, ok := ret.Get(0).(func(context.Context, int64) []*models.Blob); ok { if rf, ok := ret.Get(0).(func(context.Context, int64) []*models.Blob); ok {
r0 = rf(ctx, timeWindow) r0 = rf(ctx, timeWindowHours)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).([]*models.Blob) r0 = ret.Get(0).([]*models.Blob)
@ -236,7 +236,7 @@ func (_m *Manager) UselessBlobs(ctx context.Context, timeWindow int64) ([]*model
var r1 error var r1 error
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
r1 = rf(ctx, timeWindow) r1 = rf(ctx, timeWindowHours)
} else { } else {
r1 = ret.Error(1) r1 = ret.Error(1)
} }