feat: implement beego session provider

Signed-off-by: chlins <chenyuzh@vmware.com>
This commit is contained in:
chlins 2022-03-17 14:53:20 +08:00
parent 21effeeb5b
commit dbb8859ad8
8 changed files with 414 additions and 57 deletions

View File

@ -99,7 +99,7 @@ func (c *controller) EnsureTag(ctx context.Context, art lib.ArtifactInfo, tagNam
// search the digest in cache and query with trimmed digest
var trimmedDigest string
err := c.cache.Fetch(ctx, TrimmedManifestlist+art.Digest, &trimmedDigest)
if err == cache.ErrNotFound {
if errors.Is(err, cache.ErrNotFound) {
// skip to update digest, continue
} else if err != nil {
// for other error, return
@ -171,7 +171,7 @@ func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo,
err = c.cache.Fetch(ctx, manifestListKey(art.Repository, string(desc.Digest)), &content)
if err != nil {
if err == cache.ErrNotFound {
if errors.Is(err, cache.ErrNotFound) {
log.Debugf("Digest is not found in manifest list cache, key=cache:%v", manifestListKey(art.Repository, string(desc.Digest)))
} else {
log.Errorf("Failed to get manifest list from cache, error: %v", err)

View File

@ -16,24 +16,20 @@ package main
import (
"context"
"encoding/gob"
"flag"
"fmt"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/astaxie/beego"
_ "github.com/astaxie/beego/session/redis"
_ "github.com/astaxie/beego/session/redis_sentinel"
"github.com/goharbor/harbor/src/core/session"
"github.com/goharbor/harbor/src/common/dao"
common_http "github.com/goharbor/harbor/src/common/http"
commonmodels "github.com/goharbor/harbor/src/common/models"
configCtl "github.com/goharbor/harbor/src/controller/config"
_ "github.com/goharbor/harbor/src/controller/event/handler"
"github.com/goharbor/harbor/src/controller/health"
@ -125,57 +121,11 @@ func main() {
if len(redisURL) > 0 {
u, err := url.Parse(redisURL)
if err != nil {
panic("bad _REDIS_URL:" + redisURL)
panic("bad _REDIS_URL")
}
gob.Register(commonmodels.User{})
if u.Scheme == "redis+sentinel" {
ps := strings.Split(u.Path, "/")
if len(ps) < 2 {
panic("bad redis sentinel url: no master name")
}
ss := make([]string, 5)
ss[0] = strings.Join(strings.Split(u.Host, ","), ";") // host
ss[1] = "100" // pool
if u.User != nil {
password, isSet := u.User.Password()
if isSet {
ss[2] = password
}
}
if len(ps) > 2 {
db, err := strconv.Atoi(ps[2])
if err != nil {
panic("bad redis sentinel url: bad db")
}
if db != 0 {
ss[3] = ps[2]
}
}
ss[4] = ps[1] // monitor name
beego.BConfig.WebConfig.Session.SessionProvider = "redis_sentinel"
beego.BConfig.WebConfig.Session.SessionProviderConfig = strings.Join(ss, ",")
} else {
ss := make([]string, 5)
ss[0] = u.Host // host
ss[1] = "100" // pool
if u.User != nil {
password, isSet := u.User.Password()
if isSet {
ss[2] = password
}
}
if len(u.Path) > 1 {
if _, err := strconv.Atoi(u.Path[1:]); err != nil {
panic("bad redis url: bad db")
}
ss[3] = u.Path[1:]
}
ss[4] = u.Query().Get("idle_timeout_seconds")
beego.BConfig.WebConfig.Session.SessionProvider = "redis"
beego.BConfig.WebConfig.Session.SessionProviderConfig = strings.Join(ss, ",")
}
beego.BConfig.WebConfig.Session.SessionProvider = session.HarborProviderName
beego.BConfig.WebConfig.Session.SessionProviderConfig = redisURL
log.Info("initializing cache ...")
if err := cache.Initialize(u.Scheme, redisURL); err != nil {

66
src/core/session/codec.go Normal file
View File

@ -0,0 +1,66 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package session
import (
"encoding/gob"
"github.com/astaxie/beego/session"
commonmodels "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/errors"
)
func init() {
gob.Register(commonmodels.User{})
}
var (
// codec the default codec for the cache
codec cache.Codec = &gobCodec{}
)
type gobCodec struct{}
func (*gobCodec) Encode(v interface{}) ([]byte, error) {
if vm, ok := v.(map[interface{}]interface{}); ok {
return session.EncodeGob(vm)
}
return nil, errors.Errorf("object type invalid, %#v", v)
}
func (*gobCodec) Decode(data []byte, v interface{}) error {
vm, err := session.DecodeGob(data)
if err != nil {
return err
}
switch in := v.(type) {
case map[interface{}]interface{}:
for k, v := range vm {
in[k] = v
}
case *map[interface{}]interface{}:
m := *in
for k, v := range vm {
m[k] = v
}
default:
return errors.Errorf("object type invalid, %#v", v)
}
return nil
}

View File

@ -0,0 +1,41 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package session
import (
"testing"
"github.com/stretchr/testify/assert"
)
type User struct {
User string
Pass string
}
func TestCodec(t *testing.T) {
u := &User{User: "admin", Pass: "123456"}
m := make(map[interface{}]interface{})
m["user"] = u
c, err := codec.Encode(m)
assert.NoError(t, err, "encode should not error")
v := make(map[interface{}]interface{})
err = codec.Decode(c, &v)
assert.NoError(t, err, "decode should not error")
user, exist := v["user"]
assert.True(t, exist, "user should exist")
assert.True(t, assert.ObjectsAreEqualValues(u, user), "user should equal")
}

176
src/core/session/session.go Normal file
View File

@ -0,0 +1,176 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package session
import (
"context"
"net/http"
"strings"
"sync"
"time"
"github.com/astaxie/beego/session"
goredis "github.com/go-redis/redis/v8"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/cache/redis"
"github.com/goharbor/harbor/src/lib/log"
)
const (
// HarborProviderName is the harbor session provider name
HarborProviderName = "harbor"
)
var harborpder = &Provider{}
// SessionStore redis session store
type SessionStore struct {
c cache.Cache
sid string
lock sync.RWMutex
values map[interface{}]interface{}
maxlifetime int64
}
// Set value in redis session
func (rs *SessionStore) Set(key, value interface{}) error {
rs.lock.Lock()
defer rs.lock.Unlock()
rs.values[key] = value
return nil
}
// Get value in redis session
func (rs *SessionStore) Get(key interface{}) interface{} {
rs.lock.RLock()
defer rs.lock.RUnlock()
if v, ok := rs.values[key]; ok {
return v
}
return nil
}
// Delete value in redis session
func (rs *SessionStore) Delete(key interface{}) error {
rs.lock.Lock()
defer rs.lock.Unlock()
delete(rs.values, key)
return nil
}
// Flush clear all values in redis session
func (rs *SessionStore) Flush() error {
rs.lock.Lock()
defer rs.lock.Unlock()
rs.values = make(map[interface{}]interface{})
return nil
}
// SessionID get redis session id
func (rs *SessionStore) SessionID() string {
return rs.sid
}
// SessionRelease save session values to redis
func (rs *SessionStore) SessionRelease(w http.ResponseWriter) {
b, err := session.EncodeGob(rs.values)
if err != nil {
return
}
if rdb, ok := rs.c.(*redis.Cache); ok {
cmd := rdb.Client.Set(context.TODO(), rs.sid, string(b), time.Duration(rs.maxlifetime))
if cmd.Err() != nil {
log.Debugf("release session error: %v", err)
}
}
}
// Provider redis session provider
type Provider struct {
maxlifetime int64
c cache.Cache
}
// SessionInit init redis session
func (rp *Provider) SessionInit(maxlifetime int64, url string) (err error) {
rp.maxlifetime = maxlifetime * int64(time.Second)
rp.c, err = redis.New(cache.Options{Address: url, Codec: codec})
if err != nil {
return err
}
return rp.c.Ping(context.TODO())
}
// SessionRead read redis session by sid
func (rp *Provider) SessionRead(sid string) (session.Store, error) {
kv := make(map[interface{}]interface{})
err := rp.c.Fetch(context.TODO(), sid, &kv)
if err != nil && !strings.Contains(err.Error(), goredis.Nil.Error()) {
return nil, err
}
rs := &SessionStore{c: rp.c, sid: sid, values: kv, maxlifetime: rp.maxlifetime}
return rs, nil
}
// SessionExist check redis session exist by sid
func (rp *Provider) SessionExist(sid string) bool {
return rp.c.Contains(context.TODO(), sid)
}
// SessionRegenerate generate new sid for redis session
func (rp *Provider) SessionRegenerate(oldsid, sid string) (session.Store, error) {
ctx := context.TODO()
if !rp.SessionExist(oldsid) {
rp.c.Save(ctx, sid, "", time.Duration(rp.maxlifetime))
} else {
if rdb, ok := rp.c.(*redis.Cache); ok {
// redis has rename command
rdb.Rename(ctx, oldsid, sid)
rdb.Expire(ctx, sid, time.Duration(rp.maxlifetime))
} else {
kv := make(map[interface{}]interface{})
err := rp.c.Fetch(ctx, sid, &kv)
if err != nil && !strings.Contains(err.Error(), goredis.Nil.Error()) {
return nil, err
}
rp.c.Delete(ctx, oldsid)
rp.c.Save(ctx, sid, kv)
}
}
return rp.SessionRead(sid)
}
// SessionDestroy delete redis session by id
func (rp *Provider) SessionDestroy(sid string) error {
return rp.c.Delete(context.TODO(), sid)
}
// SessionGC Implement method, no used.
func (rp *Provider) SessionGC() {
}
// SessionAll return all activeSession
func (rp *Provider) SessionAll() int {
return 0
}
func init() {
session.Register(HarborProviderName, harborpder)
}

View File

@ -0,0 +1,113 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package session
import (
"testing"
"github.com/astaxie/beego/session"
"github.com/stretchr/testify/suite"
)
type sessionTestSuite struct {
suite.Suite
provider session.Provider
}
func (s *sessionTestSuite) SetupTest() {
var err error
s.provider, err = session.GetProvider("harbor")
s.NoError(err, "should get harbor provider")
s.NotNil(s.provider, "provider should not nil")
err = s.provider.SessionInit(3600, "redis://127.0.0.1:6379/0")
s.NoError(err, "session init should not error")
}
func (s *sessionTestSuite) TestSessionRead() {
store, err := s.provider.SessionRead("session-001")
s.NoError(err, "session read should not error")
s.NotNil(store)
}
func (s *sessionTestSuite) TestSessionExist() {
// prepare session
store, err := s.provider.SessionRead("session-001")
s.NoError(err, "session read should not error")
s.NotNil(store)
store.SessionRelease(nil)
defer func() {
// clean session
err = s.provider.SessionDestroy("session-001")
s.NoError(err)
}()
exist := s.provider.SessionExist("session-001")
s.True(exist, "session-001 should exist")
exist = s.provider.SessionExist("session-002")
s.False(exist, "session-002 should not exist")
}
func (s *sessionTestSuite) TestSessionRegenerate() {
// prepare session
store, err := s.provider.SessionRead("session-001")
s.NoError(err, "session read should not error")
s.NotNil(store)
store.SessionRelease(nil)
defer func() {
// clean session
err = s.provider.SessionDestroy("session-001")
s.NoError(err)
err = s.provider.SessionDestroy("session-003")
s.NoError(err)
}()
_, err = s.provider.SessionRegenerate("session-001", "session-003")
s.NoError(err, "session regenerate should not error")
s.True(s.provider.SessionExist("session-003"))
s.False(s.provider.SessionExist("session-001"))
}
func (s *sessionTestSuite) TestSessionDestroy() {
// prepare session
store, err := s.provider.SessionRead("session-004")
s.NoError(err, "session read should not error")
s.NotNil(store)
store.SessionRelease(nil)
s.True(s.provider.SessionExist("session-004"), "session-004 should exist")
err = s.provider.SessionDestroy("session-004")
s.NoError(err, "session destroy should not error")
s.False(s.provider.SessionExist("session-004"), "session-004 should not exist")
}
func (s *sessionTestSuite) TestSessionGC() {
s.provider.SessionGC()
}
func (s *sessionTestSuite) TestSessionAll() {
c := s.provider.SessionAll()
s.Equal(0, c)
}
func TestSession(t *testing.T) {
suite.Run(t, &sessionTestSuite{})
}

View File

@ -41,3 +41,8 @@ func (*msgpackCodec) Encode(v interface{}) ([]byte, error) {
func (*msgpackCodec) Decode(data []byte, v interface{}) error {
return msgpack.Unmarshal(data, v)
}
// DefaultCodec returns default codec.
func DefaultCodec() Codec {
return codec
}

View File

@ -16,6 +16,7 @@ package redis
import (
"context"
"fmt"
"net/url"
"time"
@ -53,7 +54,8 @@ func (c *Cache) Fetch(ctx context.Context, key string, value interface{}) error
if err != nil {
// convert internal or Timeout error to be ErrNotFound
// so that the caller can continue working without breaking
return cache.ErrNotFound
// return cache.ErrNotFound
return fmt.Errorf("%w:%v", cache.ErrNotFound, err)
}
if err := c.opts.Codec.Decode(data, value); err != nil {
@ -91,6 +93,10 @@ func New(opts cache.Options) (cache.Cache, error) {
opts.Address = "redis://localhost:6379/0"
}
if opts.Codec == nil {
opts.Codec = cache.DefaultCodec()
}
u, err := url.Parse(opts.Address)
if err != nil {
return nil, err