checkpoint, filedb cache, wait, migrations

This commit is contained in:
sawka 2024-02-13 12:56:16 -05:00
parent eab8e3be02
commit e2ff745f55
5 changed files with 198 additions and 83 deletions

View File

@ -824,7 +824,7 @@ func main() {
return return
} }
if len(os.Args) >= 2 && strings.HasPrefix(os.Args[1], "--migrate") { if len(os.Args) >= 2 && strings.HasPrefix(os.Args[1], "--migrate") {
err := sstore.MigrateCommandOpts(os.Args[1:]) err := sstore.MainDBMigrateCommandOpts(os.Args[1:])
if err != nil { if err != nil {
log.Printf("[error] migrate cmd: %v\n", err) log.Printf("[error] migrate cmd: %v\n", err)
} }
@ -836,7 +836,7 @@ func main() {
return return
} }
GlobalAuthKey = authKey GlobalAuthKey = authKey
err = sstore.TryMigrateUp() err = sstore.MainDBTryMigrateUp()
if err != nil { if err != nil {
log.Printf("[error] migrate up: %v\n", err) log.Printf("[error] migrate up: %v\n", err)
return return

View File

@ -0,0 +1 @@
DROP TABLE file;

View File

@ -0,0 +1,9 @@
CREATE TABLE file (
screenid varchar(36) NOT NULL,
lineid varchar(36) NOT NULL,
filename varchar(200) NOT NULL,
filetype varchar(20) NOT NULL,
diskfilename varchar(250) NOT NULL,
contents blob NOT NULL,
PRIMARY KEY (screenid, lineid, filename)
);

View File

@ -13,7 +13,6 @@ import (
"os" "os"
"path" "path"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@ -23,70 +22,159 @@ import (
"github.com/wavetermdev/waveterm/wavesrv/pkg/scbase" "github.com/wavetermdev/waveterm/wavesrv/pkg/scbase"
) )
const MaxDBFileSize = 10 * 1024 const MaxFileDBInlineFileSize = 10 * 1024
var screenDirLock = &sync.Mutex{} var screenDirLock = &sync.Mutex{}
var screenDirCache = make(map[string]string) // locked with screenDirLock var screenDirCache = make(map[string]string) // locked with screenDirLock
var globalDBFileCache = makeDBFileCache() var globalFileDBCache = makeFileDBCache()
type dbFileCacheEntry struct { type fileDBCacheEntry struct {
DBLock *sync.Mutex ScreenId string
DB *sqlx.DB CVar *sync.Cond // condition variable to lock entry fields and wait on InUse flag
InUse atomic.Bool DB *sqlx.DB // can be nil (when not in use), will need to be reopend on access
InUse bool
Migrated bool // only try to migrate the DB once per run
Waiters int
LastUse time.Time
OpenErr error // we cache open errors (and return them on GetDB)
} }
type DBFileCache struct { // we store all screens in this cache (added on demand)
// when not in use we can close the DB object
type FileDBCache struct {
Lock *sync.Mutex Lock *sync.Mutex
Cache map[string]*dbFileCacheEntry Cache map[string]*fileDBCacheEntry // key = screenid
} }
func makeDBFileCache() *DBFileCache { // will create an entry if it doesn't exist
return &DBFileCache{ func (dbc *FileDBCache) GetEntry(screenId string) *fileDBCacheEntry {
Lock: &sync.Mutex{}, dbc.Lock.Lock()
Cache: make(map[string]*dbFileCacheEntry), defer dbc.Lock.Unlock()
} entry := dbc.Cache[screenId]
}
func (c *DBFileCache) GetDB(screenId string) (*sqlx.DB, error) {
c.Lock.Lock()
defer c.Lock.Unlock()
entry := c.Cache[screenId]
if entry != nil { if entry != nil {
entry.DBLock.Lock() return entry
entry.InUse.Store(true)
return entry.DB, nil
} }
_, err := EnsureScreenDir(screenId) entry = &fileDBCacheEntry{
ScreenId: screenId,
CVar: sync.NewCond(&sync.Mutex{}),
DB: nil,
Migrated: false,
InUse: false,
Waiters: 0,
LastUse: time.Time{},
}
dbc.Cache[screenId] = entry
return entry
}
func makeFileDBCache() *FileDBCache {
return &FileDBCache{
Lock: &sync.Mutex{},
Cache: make(map[string]*fileDBCacheEntry),
}
}
func MakeFileDBUrl(screenId string) (string, error) {
screenDir, err := EnsureScreenDir(screenId)
if err != nil {
return "", err
}
fileDBName := path.Join(screenDir, "filedb.db")
return fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", fileDBName), nil
}
func MakeFileDB(screenId string) (*sqlx.DB, error) {
dbUrl, err := MakeFileDBUrl(screenId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return nil, nil return sqlx.Open("sqlite3", dbUrl)
} }
func (c *DBFileCache) ReleaseDB(screenId string, db *sqlx.DB) { // will close the DB if not in use (and no waiters)
entry := c.Cache[screenId] // returns (closed, closeErr)
if entry == nil { // if we cannot close the DB (in use), then we return (false, nil)
// this shouldn't happen (error) // if DB is already closed, we'll return (true, nil)
log.Printf("[db] error missing cache entry for dbfile %s", screenId) // if there is an error closing the DB, we'll return (true, err)
return // on successful close returns (true, nil)
func (entry *fileDBCacheEntry) CloseDB() (bool, error) {
entry.CVar.L.Lock()
defer entry.CVar.L.Unlock()
if entry.DB == nil {
return true, nil
} }
entry.DBLock.Unlock() if entry.InUse || entry.Waiters > 0 {
entry.InUse.Store(false) return false, nil
// noop for now }
err := entry.DB.Close()
entry.DB = nil
return true, err
}
// will create DB if doesn't exist
// will Wait() on CVar if InUse
// updates Waiters appropriately
func (entry *fileDBCacheEntry) GetDB() (*sqlx.DB, error) {
entry.CVar.L.Lock()
defer entry.CVar.L.Unlock()
if entry.OpenErr != nil {
return nil, entry.OpenErr
}
entry.Waiters++
for {
if entry.InUse {
entry.CVar.Wait()
continue
}
break
}
entry.Waiters--
if !entry.Migrated {
FileDBMigrateUp(entry.ScreenId)
entry.Migrated = true
}
if entry.DB == nil {
db, err := MakeFileDB(entry.ScreenId)
if err != nil {
entry.OpenErr = err
return nil, err
}
entry.DB = db
}
entry.InUse = true
entry.LastUse = time.Now()
return entry.DB, nil
}
func (entry *fileDBCacheEntry) ReleaseDB() {
entry.CVar.L.Lock()
defer entry.CVar.L.Unlock()
entry.InUse = false
entry.CVar.Signal()
}
func (c *FileDBCache) GetDB(screenId string) (*sqlx.DB, error) {
entry := c.GetEntry(screenId)
return entry.GetDB()
}
func (c *FileDBCache) ReleaseDB(screenId string, db *sqlx.DB) {
entry := c.Cache[screenId]
entry.ReleaseDB()
} }
// fulfills the txwrap DBGetter interface // fulfills the txwrap DBGetter interface
type DBFileGetter struct { type FileDBGetter struct {
ScreenId string ScreenId string
} }
func (g DBFileGetter) GetDB(ctx context.Context) (*sqlx.DB, error) { func (g FileDBGetter) GetDB(ctx context.Context) (*sqlx.DB, error) {
return globalDBFileCache.GetDB(g.ScreenId) return globalFileDBCache.GetDB(g.ScreenId)
} }
func (g DBFileGetter) ReleaseDB(db *sqlx.DB) { func (g FileDBGetter) ReleaseDB(db *sqlx.DB) {
globalDBFileCache.ReleaseDB(g.ScreenId, db) globalFileDBCache.ReleaseDB(g.ScreenId, db)
} }
func TryConvertPtyFile(ctx context.Context, screenId string, lineId string) error { func TryConvertPtyFile(ctx context.Context, screenId string, lineId string) error {
@ -94,7 +182,7 @@ func TryConvertPtyFile(ctx context.Context, screenId string, lineId string) erro
if err != nil { if err != nil {
return fmt.Errorf("convert ptyfile, cannot stat: %w", err) return fmt.Errorf("convert ptyfile, cannot stat: %w", err)
} }
if stat.DataSize > MaxDBFileSize { if stat.DataSize > MaxFileDBInlineFileSize {
return nil return nil
} }
return nil return nil

View File

@ -22,27 +22,41 @@ import (
"github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4"
) )
const MaxMigration = 31 const MaxMainDBMigration = 31
const MigratePrimaryScreenVersion = 9 const MigratePrimaryScreenVersion = 9
const CmdScreenSpecialMigration = 13 const CmdScreenSpecialMigration = 13
const CmdLineSpecialMigration = 20 const CmdLineSpecialMigration = 20
const RISpecialMigration = 30 const RISpecialMigration = 30
func MakeMigrate() (*migrate.Migrate, error) { func MakeMainDBMigrate() (*migrate.Migrate, error) {
fsVar, err := iofs.New(sh2db.MigrationFS, "migrations") fsVar, err := iofs.New(sh2db.MigrationFS, "migrations")
if err != nil { if err != nil {
return nil, fmt.Errorf("opening iofs: %w", err) return nil, fmt.Errorf("opening iofs: %w", err)
} }
// migrationPathUrl := fmt.Sprintf("file://%s", path.Join(wd, "db", "migrations"))
dbUrl := fmt.Sprintf("sqlite3://%s", GetDBName()) dbUrl := fmt.Sprintf("sqlite3://%s", GetDBName())
m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl) m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl)
// m, err := migrate.New(migrationPathUrl, dbUrl)
if err != nil { if err != nil {
return nil, fmt.Errorf("making migration db[%s]: %w", GetDBName(), err) return nil, fmt.Errorf("making migration db[%s]: %w", GetDBName(), err)
} }
return m, nil return m, nil
} }
func MakeFileDBMigrate(screenId string) (*migrate.Migrate, error) {
fsVar, err := iofs.New(sh2db.MigrationFS, "filedb-migrations")
if err != nil {
return nil, fmt.Errorf("opening iofs: %w", err)
}
dbUrl, err := MakeFileDBUrl(screenId)
if err != nil {
return nil, fmt.Errorf("making file db url for screenid %s: %w", screenId, err)
}
m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl)
if err != nil {
return nil, fmt.Errorf("making migration db[%s]: %w", dbUrl, err)
}
return m, nil
}
func copyFile(srcFile string, dstFile string, notFoundOk bool) error { func copyFile(srcFile string, dstFile string, notFoundOk bool) error {
if srcFile == dstFile { if srcFile == dstFile {
return fmt.Errorf("cannot copy %s to itself", srcFile) return fmt.Errorf("cannot copy %s to itself", srcFile)
@ -95,11 +109,7 @@ func MigrateUpStep(m *migrate.Migrate, newVersion uint) error {
return nil return nil
} }
func MigrateUp(targetVersion uint) error { func MigrateUp(m *migrate.Migrate, targetVersion uint) error {
m, err := MakeMigrate()
if err != nil {
return err
}
curVersion, dirty, err := MigrateVersion(m) curVersion, dirty, err := MigrateVersion(m)
if dirty { if dirty {
return fmt.Errorf("cannot migrate up, database is dirty") return fmt.Errorf("cannot migrate up, database is dirty")
@ -135,11 +145,7 @@ func MigrateUp(targetVersion uint) error {
// returns curVersion, dirty, error // returns curVersion, dirty, error
func MigrateVersion(m *migrate.Migrate) (uint, bool, error) { func MigrateVersion(m *migrate.Migrate) (uint, bool, error) {
if m == nil { if m == nil {
var err error return 0, false, fmt.Errorf("migrate object is nil")
m, err = MakeMigrate()
if err != nil {
return 0, false, err
}
} }
curVersion, dirty, err := m.Version() curVersion, dirty, err := m.Version()
if err == migrate.ErrNilVersion { if err == migrate.ErrNilVersion {
@ -148,52 +154,60 @@ func MigrateVersion(m *migrate.Migrate) (uint, bool, error) {
return curVersion, dirty, err return curVersion, dirty, err
} }
func MigrateDown() error { func MigrateDown(m *migrate.Migrate) error {
m, err := MakeMigrate() err := m.Down()
if err != nil {
return err
}
err = m.Down()
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
func MigrateGoto(n uint) error { func MigrateGoto(m *migrate.Migrate, n uint) error {
curVersion, _, _ := MigrateVersion(nil) curVersion, _, _ := MigrateVersion(m)
if curVersion == n { if curVersion == n {
return nil return nil
} }
if curVersion < n { if curVersion < n {
return MigrateUp(n) return MigrateUp(m, n)
} }
m, err := MakeMigrate() err := m.Migrate(n)
if err != nil {
return err
}
err = m.Migrate(n)
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
func TryMigrateUp() error { func MainDBTryMigrateUp() error {
curVersion, _, _ := MigrateVersion(nil) m, err := MakeMainDBMigrate()
if err != nil {
return fmt.Errorf("error trying to run main-db migrations: %w", err)
}
return TryMigrateUp(m)
}
func FileDBMigrateUp(screenId string) error {
m, err := MakeFileDBMigrate(screenId)
if err != nil {
return fmt.Errorf("error trying to run file-db migrations for screenid %s: %w", screenId, err)
}
return m.Up()
}
func TryMigrateUp(m *migrate.Migrate) error {
curVersion, _, _ := MigrateVersion(m)
log.Printf("[db] db version = %d\n", curVersion) log.Printf("[db] db version = %d\n", curVersion)
if curVersion >= MaxMigration { if curVersion >= MaxMainDBMigration {
return nil return nil
} }
err := MigrateUp(MaxMigration) err := MigrateUp(m, MaxMainDBMigration)
if err != nil { if err != nil {
return err return err
} }
return MigratePrintVersion() return MigratePrintVersion(m)
} }
func MigratePrintVersion() error { func MigratePrintVersion(m *migrate.Migrate) error {
version, dirty, err := MigrateVersion(nil) version, dirty, err := MigrateVersion(m)
if err != nil { if err != nil {
return fmt.Errorf("error getting db version: %v", err) return fmt.Errorf("error getting db version: %v", err)
} }
@ -204,22 +218,25 @@ func MigratePrintVersion() error {
return nil return nil
} }
func MigrateCommandOpts(opts []string) error { func MainDBMigrateCommandOpts(opts []string) error {
var err error m, err := MakeMainDBMigrate()
if err != nil {
return fmt.Errorf("error trying to run main-db migrations: %w", err)
}
if opts[0] == "--migrate-up" { if opts[0] == "--migrate-up" {
fmt.Printf("migrate-up %v\n", GetDBName()) fmt.Printf("migrate-up %v\n", GetDBName())
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
err = MigrateUp(MaxMigration) err = MigrateUp(m, MaxMainDBMigration)
} else if opts[0] == "--migrate-down" { } else if opts[0] == "--migrate-down" {
fmt.Printf("migrate-down %v\n", GetDBName()) fmt.Printf("migrate-down %v\n", GetDBName())
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
err = MigrateDown() err = MigrateDown(m)
} else if opts[0] == "--migrate-goto" { } else if opts[0] == "--migrate-goto" {
n, err := strconv.Atoi(opts[1]) n, err := strconv.Atoi(opts[1])
if err == nil { if err == nil {
fmt.Printf("migrate-goto %v => %d\n", GetDBName(), n) fmt.Printf("migrate-goto %v => %d\n", GetDBName(), n)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
err = MigrateGoto(uint(n)) err = MigrateGoto(m, uint(n))
} }
} else { } else {
err = fmt.Errorf("invalid migration command") err = fmt.Errorf("invalid migration command")
@ -230,5 +247,5 @@ func MigrateCommandOpts(opts []string) error {
if err != nil { if err != nil {
return err return err
} }
return MigratePrintVersion() return MigratePrintVersion(m)
} }