use new txwrap, simplify

This commit is contained in:
sawka 2024-05-13 11:45:47 -07:00
parent 023e1babe2
commit 7b8c486621
4 changed files with 28 additions and 79 deletions

4
go.mod
View File

@ -6,9 +6,10 @@ toolchain go1.22.1
require ( require (
github.com/golang-migrate/migrate/v4 v4.17.1 github.com/golang-migrate/migrate/v4 v4.17.1
github.com/google/uuid v1.4.0
github.com/jmoiron/sqlx v1.4.0 github.com/jmoiron/sqlx v1.4.0
github.com/mattn/go-sqlite3 v1.14.22 github.com/mattn/go-sqlite3 v1.14.22
github.com/sawka/txwrap v0.1.2 github.com/sawka/txwrap v0.2.0
github.com/wailsapp/wails/v3 v3.0.0-alpha.0 github.com/wailsapp/wails/v3 v3.0.0-alpha.0
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94 github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94
) )
@ -28,7 +29,6 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect

4
go.sum
View File

@ -106,8 +106,8 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sawka/txwrap v0.1.2 h1:v8xS0Z1LE7/6vMZA81PYihI+0TSR6Zm1MalzzBIuXKc= github.com/sawka/txwrap v0.2.0 h1:V3LfvKVLULxcYSxdMguLwFyQFMEU9nFDJopg0ZkL+94=
github.com/sawka/txwrap v0.1.2/go.mod h1:T3nlw2gVpuolo6/XEetvBbk1oMXnY978YmBFy1UyHvw= github.com/sawka/txwrap v0.2.0/go.mod h1:wwQ2SQiN4U+6DU/iVPhbvr7OzXAtgZlQCIGuvOswEfA=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=

View File

@ -15,21 +15,18 @@ func initDb(t *testing.T) {
t.Logf("initializing db for %q", t.Name()) t.Logf("initializing db for %q", t.Name())
useTestingDb = true useTestingDb = true
partDataSize = 64 partDataSize = 64
err := MigrateBlockstore(false) err := InitBlockstore()
if err != nil { if err != nil {
t.Fatalf("error migrating blockstore: %v", err) t.Fatalf("error initializing blockstore: %v", err)
} }
} }
func cleanupDb(t *testing.T) { func cleanupDb(t *testing.T) {
t.Logf("cleaning up db for %q", t.Name()) t.Logf("cleaning up db for %q", t.Name())
globalDBLock.Lock()
defer globalDBLock.Unlock()
if globalDB != nil { if globalDB != nil {
globalDB.Close() globalDB.Close()
globalDB = nil globalDB = nil
} }
globalDBErr = nil
useTestingDb = false useTestingDb = false
partDataSize = DefaultPartDataSize partDataSize = DefaultPartDataSize
GBS.clearCache() GBS.clearCache()

View File

@ -8,7 +8,6 @@ import (
"fmt" "fmt"
"log" "log"
"path" "path"
"sync"
"time" "time"
"github.com/wavetermdev/thenextwave/pkg/wavebase" "github.com/wavetermdev/thenextwave/pkg/wavebase"
@ -25,26 +24,20 @@ import (
const BlockstoreDbName = "blockstore.db" const BlockstoreDbName = "blockstore.db"
type SingleConnDBGetter struct {
SingleConnLock *sync.Mutex
}
type TxWrap = txwrap.TxWrap type TxWrap = txwrap.TxWrap
var dbWrap *SingleConnDBGetter = &SingleConnDBGetter{SingleConnLock: &sync.Mutex{}}
var globalDBLock = &sync.Mutex{}
var globalDB *sqlx.DB var globalDB *sqlx.DB
var globalDBErr error
var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db) var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db)
func InitBlockstore() error { func InitBlockstore() error {
err := MigrateBlockstore(false) ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
var err error
globalDB, err = MakeDB(ctx)
if err != nil { if err != nil {
return err return err
} }
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) err = MigrateBlockstore()
defer cancelFn()
_, err = GetDB(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -57,63 +50,31 @@ func GetDBName() string {
return path.Join(scHome, BlockstoreDbName) return path.Join(scHome, BlockstoreDbName)
} }
func GetDB(ctx context.Context) (*sqlx.DB, error) { func MakeDB(ctx context.Context) (*sqlx.DB, error) {
if txwrap.IsTxWrapContext(ctx) { var rtn *sqlx.DB
return nil, fmt.Errorf("cannot call GetDB from within a running transaction") var err error
} if useTestingDb {
globalDBLock.Lock() dbName := ":memory:"
defer globalDBLock.Unlock() log.Printf("[db] using in-memory db\n")
if globalDB == nil && globalDBErr == nil { rtn, err = sqlx.Open("sqlite3", dbName)
if useTestingDb { } else {
dbName := ":memory:"
globalDB, globalDBErr = sqlx.Open("sqlite3", dbName)
if globalDBErr != nil {
log.Printf("[db] in-memory db err: %v\n", globalDBErr)
} else {
log.Printf("[db] using in-memory db\n")
}
return globalDB, globalDBErr
}
dbName := GetDBName() dbName := GetDBName()
globalDB, globalDBErr = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName)) log.Printf("[db] opening db %s\n", dbName)
if globalDBErr != nil { rtn, err = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName))
globalDBErr = fmt.Errorf("opening db[%s]: %w", dbName, globalDBErr)
log.Printf("[db] error: %v\n", globalDBErr)
} else {
log.Printf("[db] successfully opened db %s\n", dbName)
}
} }
return globalDB, globalDBErr
}
func (dbg *SingleConnDBGetter) GetDB(ctx context.Context) (*sqlx.DB, error) {
db, err := GetDB(ctx)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("opening db: %w", err)
} }
dbg.SingleConnLock.Lock() rtn.DB.SetMaxOpenConns(1)
return db, nil return rtn, nil
}
func (dbg *SingleConnDBGetter) ReleaseDB(db *sqlx.DB) {
dbg.SingleConnLock.Unlock()
} }
func WithTx(ctx context.Context, fn func(tx *TxWrap) error) error { func WithTx(ctx context.Context, fn func(tx *TxWrap) error) error {
return txwrap.DBGWithTx(ctx, dbWrap, fn) return txwrap.WithTx(ctx, globalDB, fn)
} }
func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (RT, error) { func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (RT, error) {
var rtn RT return txwrap.WithTxRtn(ctx, globalDB, fn)
txErr := WithTx(ctx, func(tx *TxWrap) error {
temp, err := fn(tx)
if err != nil {
return err
}
rtn = temp
return nil
})
return rtn, txErr
} }
func MakeBlockstoreMigrate() (*migrate.Migrate, error) { func MakeBlockstoreMigrate() (*migrate.Migrate, error) {
@ -121,13 +82,7 @@ func MakeBlockstoreMigrate() (*migrate.Migrate, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("opening iofs: %w", err) return nil, fmt.Errorf("opening iofs: %w", err)
} }
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) mdriver, err := sqlite3migrate.WithInstance(globalDB.DB, &sqlite3migrate.Config{})
defer cancelFn()
db, err := GetDB(ctx)
if err != nil {
return nil, err
}
mdriver, err := sqlite3migrate.WithInstance(db.DB, &sqlite3migrate.Config{})
if err != nil { if err != nil {
return nil, fmt.Errorf("making blockstore migration driver: %w", err) return nil, fmt.Errorf("making blockstore migration driver: %w", err)
} }
@ -138,7 +93,7 @@ func MakeBlockstoreMigrate() (*migrate.Migrate, error) {
return m, nil return m, nil
} }
func MigrateBlockstore(shouldClose bool) error { func MigrateBlockstore() error {
log.Printf("migrate blockstore\n") log.Printf("migrate blockstore\n")
m, err := MakeBlockstoreMigrate() m, err := MakeBlockstoreMigrate()
if err != nil { if err != nil {
@ -151,9 +106,6 @@ func MigrateBlockstore(shouldClose bool) error {
if err != nil { if err != nil {
return fmt.Errorf("cannot get current migration version: %v", err) return fmt.Errorf("cannot get current migration version: %v", err)
} }
if shouldClose {
defer m.Close()
}
err = m.Up() err = m.Up()
if err != nil && err != migrate.ErrNoChange { if err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("migrating blockstore: %w", err) return fmt.Errorf("migrating blockstore: %w", err)