From e2ff745f553cbd11431ef6111ad20af3d1dfdb9c Mon Sep 17 00:00:00 2001 From: sawka Date: Tue, 13 Feb 2024 12:56:16 -0500 Subject: [PATCH] checkpoint, filedb cache, wait, migrations --- wavesrv/cmd/main-server.go | 4 +- .../db/filedb-migrations/000001_init.down.sql | 1 + .../db/filedb-migrations/000001_init.up.sql | 9 + wavesrv/pkg/sstore/fileops.go | 168 +++++++++++++----- wavesrv/pkg/sstore/migrate.go | 99 ++++++----- 5 files changed, 198 insertions(+), 83 deletions(-) create mode 100644 wavesrv/db/filedb-migrations/000001_init.down.sql create mode 100644 wavesrv/db/filedb-migrations/000001_init.up.sql diff --git a/wavesrv/cmd/main-server.go b/wavesrv/cmd/main-server.go index 716e13a5c..384009a9c 100644 --- a/wavesrv/cmd/main-server.go +++ b/wavesrv/cmd/main-server.go @@ -824,7 +824,7 @@ func main() { return } if len(os.Args) >= 2 && strings.HasPrefix(os.Args[1], "--migrate") { - err := sstore.MigrateCommandOpts(os.Args[1:]) + err := sstore.MainDBMigrateCommandOpts(os.Args[1:]) if err != nil { log.Printf("[error] migrate cmd: %v\n", err) } @@ -836,7 +836,7 @@ func main() { return } GlobalAuthKey = authKey - err = sstore.TryMigrateUp() + err = sstore.MainDBTryMigrateUp() if err != nil { log.Printf("[error] migrate up: %v\n", err) return diff --git a/wavesrv/db/filedb-migrations/000001_init.down.sql b/wavesrv/db/filedb-migrations/000001_init.down.sql new file mode 100644 index 000000000..fb589a39b --- /dev/null +++ b/wavesrv/db/filedb-migrations/000001_init.down.sql @@ -0,0 +1 @@ +DROP TABLE file; \ No newline at end of file diff --git a/wavesrv/db/filedb-migrations/000001_init.up.sql b/wavesrv/db/filedb-migrations/000001_init.up.sql new file mode 100644 index 000000000..fac2fe910 --- /dev/null +++ b/wavesrv/db/filedb-migrations/000001_init.up.sql @@ -0,0 +1,9 @@ +CREATE TABLE file ( + screenid varchar(36) NOT NULL, + lineid varchar(36) NOT NULL, + filename varchar(200) NOT NULL, + filetype varchar(20) NOT NULL, + diskfilename varchar(250) NOT NULL, + contents blob NOT NULL, + PRIMARY KEY (screenid, lineid, filename) +); \ No newline at end of file diff --git a/wavesrv/pkg/sstore/fileops.go b/wavesrv/pkg/sstore/fileops.go index a808359e9..656d41286 100644 --- a/wavesrv/pkg/sstore/fileops.go +++ b/wavesrv/pkg/sstore/fileops.go @@ -13,7 +13,6 @@ import ( "os" "path" "sync" - "sync/atomic" "time" "github.com/google/uuid" @@ -23,70 +22,159 @@ import ( "github.com/wavetermdev/waveterm/wavesrv/pkg/scbase" ) -const MaxDBFileSize = 10 * 1024 +const MaxFileDBInlineFileSize = 10 * 1024 var screenDirLock = &sync.Mutex{} var screenDirCache = make(map[string]string) // locked with screenDirLock -var globalDBFileCache = makeDBFileCache() +var globalFileDBCache = makeFileDBCache() -type dbFileCacheEntry struct { - DBLock *sync.Mutex - DB *sqlx.DB - InUse atomic.Bool +type fileDBCacheEntry struct { + ScreenId string + CVar *sync.Cond // condition variable to lock entry fields and wait on InUse flag + DB *sqlx.DB // can be nil (when not in use), will need to be reopend on access + InUse bool + Migrated bool // only try to migrate the DB once per run + Waiters int + LastUse time.Time + OpenErr error // we cache open errors (and return them on GetDB) } -type DBFileCache struct { +// we store all screens in this cache (added on demand) +// when not in use we can close the DB object +type FileDBCache struct { Lock *sync.Mutex - Cache map[string]*dbFileCacheEntry + Cache map[string]*fileDBCacheEntry // key = screenid } -func makeDBFileCache() *DBFileCache { - return &DBFileCache{ - Lock: &sync.Mutex{}, - Cache: make(map[string]*dbFileCacheEntry), - } -} - -func (c *DBFileCache) GetDB(screenId string) (*sqlx.DB, error) { - c.Lock.Lock() - defer c.Lock.Unlock() - entry := c.Cache[screenId] +// will create an entry if it doesn't exist +func (dbc *FileDBCache) GetEntry(screenId string) *fileDBCacheEntry { + dbc.Lock.Lock() + defer dbc.Lock.Unlock() + entry := dbc.Cache[screenId] if entry != nil { - entry.DBLock.Lock() - entry.InUse.Store(true) - return entry.DB, nil + return entry } - _, err := EnsureScreenDir(screenId) + entry = &fileDBCacheEntry{ + ScreenId: screenId, + CVar: sync.NewCond(&sync.Mutex{}), + DB: nil, + Migrated: false, + InUse: false, + Waiters: 0, + LastUse: time.Time{}, + } + dbc.Cache[screenId] = entry + return entry +} + +func makeFileDBCache() *FileDBCache { + return &FileDBCache{ + Lock: &sync.Mutex{}, + Cache: make(map[string]*fileDBCacheEntry), + } +} + +func MakeFileDBUrl(screenId string) (string, error) { + screenDir, err := EnsureScreenDir(screenId) + if err != nil { + return "", err + } + fileDBName := path.Join(screenDir, "filedb.db") + return fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", fileDBName), nil +} + +func MakeFileDB(screenId string) (*sqlx.DB, error) { + dbUrl, err := MakeFileDBUrl(screenId) if err != nil { return nil, err } - return nil, nil + return sqlx.Open("sqlite3", dbUrl) } -func (c *DBFileCache) ReleaseDB(screenId string, db *sqlx.DB) { - entry := c.Cache[screenId] - if entry == nil { - // this shouldn't happen (error) - log.Printf("[db] error missing cache entry for dbfile %s", screenId) - return +// will close the DB if not in use (and no waiters) +// returns (closed, closeErr) +// if we cannot close the DB (in use), then we return (false, nil) +// if DB is already closed, we'll return (true, nil) +// if there is an error closing the DB, we'll return (true, err) +// on successful close returns (true, nil) +func (entry *fileDBCacheEntry) CloseDB() (bool, error) { + entry.CVar.L.Lock() + defer entry.CVar.L.Unlock() + if entry.DB == nil { + return true, nil } - entry.DBLock.Unlock() - entry.InUse.Store(false) - // noop for now + if entry.InUse || entry.Waiters > 0 { + return false, nil + } + err := entry.DB.Close() + entry.DB = nil + return true, err +} + +// will create DB if doesn't exist +// will Wait() on CVar if InUse +// updates Waiters appropriately +func (entry *fileDBCacheEntry) GetDB() (*sqlx.DB, error) { + entry.CVar.L.Lock() + defer entry.CVar.L.Unlock() + if entry.OpenErr != nil { + return nil, entry.OpenErr + } + entry.Waiters++ + for { + if entry.InUse { + entry.CVar.Wait() + continue + } + break + } + entry.Waiters-- + if !entry.Migrated { + FileDBMigrateUp(entry.ScreenId) + entry.Migrated = true + } + if entry.DB == nil { + db, err := MakeFileDB(entry.ScreenId) + if err != nil { + entry.OpenErr = err + return nil, err + } + entry.DB = db + } + entry.InUse = true + entry.LastUse = time.Now() + return entry.DB, nil +} + +func (entry *fileDBCacheEntry) ReleaseDB() { + entry.CVar.L.Lock() + defer entry.CVar.L.Unlock() + entry.InUse = false + entry.CVar.Signal() +} + +func (c *FileDBCache) GetDB(screenId string) (*sqlx.DB, error) { + entry := c.GetEntry(screenId) + return entry.GetDB() +} + +func (c *FileDBCache) ReleaseDB(screenId string, db *sqlx.DB) { + entry := c.Cache[screenId] + entry.ReleaseDB() } // fulfills the txwrap DBGetter interface -type DBFileGetter struct { +type FileDBGetter struct { ScreenId string } -func (g DBFileGetter) GetDB(ctx context.Context) (*sqlx.DB, error) { - return globalDBFileCache.GetDB(g.ScreenId) +func (g FileDBGetter) GetDB(ctx context.Context) (*sqlx.DB, error) { + return globalFileDBCache.GetDB(g.ScreenId) } -func (g DBFileGetter) ReleaseDB(db *sqlx.DB) { - globalDBFileCache.ReleaseDB(g.ScreenId, db) +func (g FileDBGetter) ReleaseDB(db *sqlx.DB) { + globalFileDBCache.ReleaseDB(g.ScreenId, db) } func TryConvertPtyFile(ctx context.Context, screenId string, lineId string) error { @@ -94,7 +182,7 @@ func TryConvertPtyFile(ctx context.Context, screenId string, lineId string) erro if err != nil { return fmt.Errorf("convert ptyfile, cannot stat: %w", err) } - if stat.DataSize > MaxDBFileSize { + if stat.DataSize > MaxFileDBInlineFileSize { return nil } return nil diff --git a/wavesrv/pkg/sstore/migrate.go b/wavesrv/pkg/sstore/migrate.go index dcd36d9af..61cf91457 100644 --- a/wavesrv/pkg/sstore/migrate.go +++ b/wavesrv/pkg/sstore/migrate.go @@ -22,27 +22,41 @@ import ( "github.com/golang-migrate/migrate/v4" ) -const MaxMigration = 31 +const MaxMainDBMigration = 31 const MigratePrimaryScreenVersion = 9 const CmdScreenSpecialMigration = 13 const CmdLineSpecialMigration = 20 const RISpecialMigration = 30 -func MakeMigrate() (*migrate.Migrate, error) { +func MakeMainDBMigrate() (*migrate.Migrate, error) { fsVar, err := iofs.New(sh2db.MigrationFS, "migrations") if err != nil { return nil, fmt.Errorf("opening iofs: %w", err) } - // migrationPathUrl := fmt.Sprintf("file://%s", path.Join(wd, "db", "migrations")) dbUrl := fmt.Sprintf("sqlite3://%s", GetDBName()) m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl) - // m, err := migrate.New(migrationPathUrl, dbUrl) if err != nil { return nil, fmt.Errorf("making migration db[%s]: %w", GetDBName(), err) } return m, nil } +func MakeFileDBMigrate(screenId string) (*migrate.Migrate, error) { + fsVar, err := iofs.New(sh2db.MigrationFS, "filedb-migrations") + if err != nil { + return nil, fmt.Errorf("opening iofs: %w", err) + } + dbUrl, err := MakeFileDBUrl(screenId) + if err != nil { + return nil, fmt.Errorf("making file db url for screenid %s: %w", screenId, err) + } + m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl) + if err != nil { + return nil, fmt.Errorf("making migration db[%s]: %w", dbUrl, err) + } + return m, nil +} + func copyFile(srcFile string, dstFile string, notFoundOk bool) error { if srcFile == dstFile { return fmt.Errorf("cannot copy %s to itself", srcFile) @@ -95,11 +109,7 @@ func MigrateUpStep(m *migrate.Migrate, newVersion uint) error { return nil } -func MigrateUp(targetVersion uint) error { - m, err := MakeMigrate() - if err != nil { - return err - } +func MigrateUp(m *migrate.Migrate, targetVersion uint) error { curVersion, dirty, err := MigrateVersion(m) if dirty { return fmt.Errorf("cannot migrate up, database is dirty") @@ -135,11 +145,7 @@ func MigrateUp(targetVersion uint) error { // returns curVersion, dirty, error func MigrateVersion(m *migrate.Migrate) (uint, bool, error) { if m == nil { - var err error - m, err = MakeMigrate() - if err != nil { - return 0, false, err - } + return 0, false, fmt.Errorf("migrate object is nil") } curVersion, dirty, err := m.Version() if err == migrate.ErrNilVersion { @@ -148,52 +154,60 @@ func MigrateVersion(m *migrate.Migrate) (uint, bool, error) { return curVersion, dirty, err } -func MigrateDown() error { - m, err := MakeMigrate() - if err != nil { - return err - } - err = m.Down() +func MigrateDown(m *migrate.Migrate) error { + err := m.Down() if err != nil { return err } return nil } -func MigrateGoto(n uint) error { - curVersion, _, _ := MigrateVersion(nil) +func MigrateGoto(m *migrate.Migrate, n uint) error { + curVersion, _, _ := MigrateVersion(m) if curVersion == n { return nil } if curVersion < n { - return MigrateUp(n) + return MigrateUp(m, n) } - m, err := MakeMigrate() - if err != nil { - return err - } - err = m.Migrate(n) + err := m.Migrate(n) if err != nil { return err } return nil } -func TryMigrateUp() error { - curVersion, _, _ := MigrateVersion(nil) +func MainDBTryMigrateUp() error { + m, err := MakeMainDBMigrate() + if err != nil { + return fmt.Errorf("error trying to run main-db migrations: %w", err) + } + return TryMigrateUp(m) +} + +func FileDBMigrateUp(screenId string) error { + m, err := MakeFileDBMigrate(screenId) + if err != nil { + return fmt.Errorf("error trying to run file-db migrations for screenid %s: %w", screenId, err) + } + return m.Up() +} + +func TryMigrateUp(m *migrate.Migrate) error { + curVersion, _, _ := MigrateVersion(m) log.Printf("[db] db version = %d\n", curVersion) - if curVersion >= MaxMigration { + if curVersion >= MaxMainDBMigration { return nil } - err := MigrateUp(MaxMigration) + err := MigrateUp(m, MaxMainDBMigration) if err != nil { return err } - return MigratePrintVersion() + return MigratePrintVersion(m) } -func MigratePrintVersion() error { - version, dirty, err := MigrateVersion(nil) +func MigratePrintVersion(m *migrate.Migrate) error { + version, dirty, err := MigrateVersion(m) if err != nil { return fmt.Errorf("error getting db version: %v", err) } @@ -204,22 +218,25 @@ func MigratePrintVersion() error { return nil } -func MigrateCommandOpts(opts []string) error { - var err error +func MainDBMigrateCommandOpts(opts []string) error { + m, err := MakeMainDBMigrate() + if err != nil { + return fmt.Errorf("error trying to run main-db migrations: %w", err) + } if opts[0] == "--migrate-up" { fmt.Printf("migrate-up %v\n", GetDBName()) time.Sleep(3 * time.Second) - err = MigrateUp(MaxMigration) + err = MigrateUp(m, MaxMainDBMigration) } else if opts[0] == "--migrate-down" { fmt.Printf("migrate-down %v\n", GetDBName()) time.Sleep(3 * time.Second) - err = MigrateDown() + err = MigrateDown(m) } else if opts[0] == "--migrate-goto" { n, err := strconv.Atoi(opts[1]) if err == nil { fmt.Printf("migrate-goto %v => %d\n", GetDBName(), n) time.Sleep(3 * time.Second) - err = MigrateGoto(uint(n)) + err = MigrateGoto(m, uint(n)) } } else { err = fmt.Errorf("invalid migration command") @@ -230,5 +247,5 @@ func MigrateCommandOpts(opts []string) error { if err != nil { return err } - return MigratePrintVersion() + return MigratePrintVersion(m) }