From da03fbe8f2a1e38f76f2f4ba67494b22963efd20 Mon Sep 17 00:00:00 2001 From: sawka Date: Sun, 12 May 2024 09:52:12 -0700 Subject: [PATCH 01/10] blockstore setup/migrations, wavebase setup --- db/db.go | 9 + db/migrations-blockstore/000001_init.down.sql | 3 + db/migrations-blockstore/000001_init.up.sql | 20 +++ frontend/app.tsx | 2 + frontend/main.ts | 2 - go.mod | 15 +- go.sum | 26 ++- main.go | 17 +- pkg/blockstore/blockstore.go | 51 ++++++ pkg/blockstore/dbsetup.go | 162 ++++++++++++++++++ pkg/wavebase/wavebase.go | 90 ++++++++++ 11 files changed, 389 insertions(+), 8 deletions(-) create mode 100644 db/db.go create mode 100644 db/migrations-blockstore/000001_init.down.sql create mode 100644 db/migrations-blockstore/000001_init.up.sql create mode 100644 pkg/blockstore/blockstore.go create mode 100644 pkg/blockstore/dbsetup.go create mode 100644 pkg/wavebase/wavebase.go diff --git a/db/db.go b/db/db.go new file mode 100644 index 000000000..b04d199a7 --- /dev/null +++ b/db/db.go @@ -0,0 +1,9 @@ +// Copyright 2023, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package db + +import "embed" + +//go:embed migrations-blockstore/*.sql +var BlockstoreMigrationFS embed.FS diff --git a/db/migrations-blockstore/000001_init.down.sql b/db/migrations-blockstore/000001_init.down.sql new file mode 100644 index 000000000..1cb72b3fd --- /dev/null +++ b/db/migrations-blockstore/000001_init.down.sql @@ -0,0 +1,3 @@ +DROP TABLE block_file; + +DROP TABLE block_data; diff --git a/db/migrations-blockstore/000001_init.up.sql b/db/migrations-blockstore/000001_init.up.sql new file mode 100644 index 000000000..dd4b70a0e --- /dev/null +++ b/db/migrations-blockstore/000001_init.up.sql @@ -0,0 +1,20 @@ +CREATE TABLE db_block_file ( + blockid varchar(36) NOT NULL, + name varchar(200) NOT NULL, + maxsize bigint NOT NULL, + circular boolean NOT NULL, + size bigint NOT NULL, + createdts bigint NOT NULL, + modts bigint NOT NULL, + meta json NOT NULL, + PRIMARY KEY (blockid, name) +); + +CREATE TABLE db_block_data ( + blockid varchar(36) NOT NULL, + name varchar(200) NOT NULL, + partidx int NOT NULL, + data blob NOT NULL, + PRIMARY KEY(blockid, name, partidx) +); + diff --git a/frontend/app.tsx b/frontend/app.tsx index afabc94c6..994695f1a 100644 --- a/frontend/app.tsx +++ b/frontend/app.tsx @@ -6,6 +6,8 @@ import { Greet } from "./bindings/main/GreetService.js"; import { Events } from "@wailsio/runtime"; import * as rx from "rxjs"; +import "/public/style.less"; + const jotaiStore = createStore(); const counterSubject = rx.interval(1000).pipe(rx.map((i) => i)); const timeAtom = jotai.atom("No time yet"); diff --git a/frontend/main.ts b/frontend/main.ts index 5b81fb9cd..488e54c56 100644 --- a/frontend/main.ts +++ b/frontend/main.ts @@ -5,8 +5,6 @@ import * as React from "react"; import { createRoot } from "react-dom/client"; import { App } from "./app.tsx"; -import "./public/style.less"; - document.addEventListener("DOMContentLoaded", () => { let reactElem = React.createElement(App, null, null); let elem = document.getElementById("main"); diff --git a/go.mod b/go.mod index d7160d1ad..757ffca7d 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,16 @@ -module thenextwave +module github.com/wavetermdev/thenextwave go 1.22 toolchain go1.22.1 -require github.com/wailsapp/wails/v3 v3.0.0-alpha.0 +require ( + github.com/golang-migrate/migrate/v4 v4.17.1 + github.com/jmoiron/sqlx v1.4.0 + github.com/mattn/go-sqlite3 v1.14.22 + github.com/sawka/txwrap v0.1.2 + github.com/wailsapp/wails/v3 v3.0.0-alpha.0 +) require ( dario.cat/mergo v1.0.0 // indirect @@ -21,7 +27,9 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.4.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect @@ -40,6 +48,7 @@ require ( github.com/wailsapp/go-webview2 v1.0.9 // indirect github.com/wailsapp/mimetype v1.4.1 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect + go.uber.org/atomic v1.7.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect golang.org/x/mod v0.12.0 // indirect diff --git a/go.sum b/go.sum index 5b1c017e9..0ff590522 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= @@ -38,18 +40,29 @@ github.com/go-git/go-git/v5 v5.11.0 h1:XIZc1p+8YzypNr34itUfSvYJcv+eYdTnTvOZ2vD3c github.com/go-git/go-git/v5 v5.11.0/go.mod h1:6GFcX2P3NM7FPBfpePbpLd21XxsgdAt+lKqXmCUiUCY= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4= +github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e h1:Q3+PugElBCf4PFpxhErSzU3/PY5sFL5Z6rfv4AbGAck= github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e/go.mod h1:alcuEEnZsY1WQsagKhZDsoPCRoOijYqhZvPwLG0kzVs= +github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= +github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -63,6 +76,8 @@ github.com/leaanthony/go-ansi-parser v1.6.1 h1:xd8bzARK3dErqkPFtoF9F3/HgN8UQk0ed github.com/leaanthony/go-ansi-parser v1.6.1/go.mod h1:+vva/2y4alzVmmIEpk9QDhA7vLC5zKDTRwfZGOp3IWU= github.com/leaanthony/u v1.1.0 h1:2n0d2BwPVXSUq5yhe8lJPHdxevE2qK5G99PMStMZMaI= github.com/leaanthony/u v1.1.0/go.mod h1:9+o6hejoRljvZ3BzdYlVL0JYCwtnAsVuN9pVTQcaRfI= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE= @@ -72,6 +87,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4= @@ -89,6 +106,8 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/sawka/txwrap v0.1.2 h1:v8xS0Z1LE7/6vMZA81PYihI+0TSR6Zm1MalzzBIuXKc= +github.com/sawka/txwrap v0.1.2/go.mod h1:T3nlw2gVpuolo6/XEetvBbk1oMXnY978YmBFy1UyHvw= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= @@ -96,6 +115,7 @@ github.com/skeema/knownhosts v1.2.1 h1:SHWdIUa82uGZz+F+47k8SY4QhhI291cXCpopT1lK2 github.com/skeema/knownhosts v1.2.1/go.mod h1:xYbVRSPxqBZFrdmDyMmsOs+uX1UZC3nTN3ThzgDxUwo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -106,6 +126,8 @@ github.com/wailsapp/mimetype v1.4.1/go.mod h1:9aV5k31bBOv5z6u+QP8TltzvNGJPmNJD4X github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= diff --git a/main.go b/main.go index 2005a5017..c857855dc 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,9 @@ import ( "log" "time" + "github.com/wavetermdev/thenextwave/pkg/blockstore" + "github.com/wavetermdev/thenextwave/pkg/wavebase" + "github.com/wailsapp/wails/v3/pkg/application" ) @@ -26,6 +29,18 @@ func (g *GreetService) Greet(name string) string { } func main() { + err := wavebase.EnsureWaveHomeDir() + if err != nil { + log.Printf("error ensuring wave home dir: %v\n", err) + return + } + log.Printf("wave home dir: %s\n", wavebase.GetWaveHomeDir()) + err = blockstore.InitBlockstore() + if err != nil { + log.Printf("error initializing blockstore: %v\n", err) + return + } + app := application.New(application.Options{ Name: "NextWave", Description: "The Next Wave Terminal", @@ -65,7 +80,7 @@ func main() { }() // blocking - err := app.Run() + err = app.Run() // If an error occurred while running the application, log it and exit. if err != nil { diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go new file mode 100644 index 000000000..39b8455a4 --- /dev/null +++ b/pkg/blockstore/blockstore.go @@ -0,0 +1,51 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package blockstore + +import ( + "database/sql/driver" + "encoding/json" +) + +type FileOptsType struct { + MaxSize int64 + Circular bool + IJson bool +} + +func (f *FileOptsType) Scan(value interface{}) error { + return json.Unmarshal(value.([]byte), f) +} + +func (f FileOptsType) Value() (driver.Value, error) { + barr, err := json.Marshal(f) + if err != nil { + return nil, err + } + return string(barr), nil +} + +type FileMeta map[string]any + +func (m *FileMeta) Scan(value interface{}) error { + return json.Unmarshal(value.([]byte), m) +} + +func (m FileMeta) Value() (driver.Value, error) { + barr, err := json.Marshal(m) + if err != nil { + return nil, err + } + return string(barr), nil +} + +type BlockFile struct { + BlockId string `json:"blockid"` + Name string `json:"name"` + Size int64 `json:"size"` + CreatedTs int64 `json:"createdts"` + ModTs int64 `json:"modts"` + Opts FileOptsType `json:"opts"` + Meta FileMeta `json:"meta"` +} diff --git a/pkg/blockstore/dbsetup.go b/pkg/blockstore/dbsetup.go new file mode 100644 index 000000000..500158d7e --- /dev/null +++ b/pkg/blockstore/dbsetup.go @@ -0,0 +1,162 @@ +package blockstore + +// setup for blockstore db +// includes migration support and txwrap setup + +import ( + "context" + "fmt" + "log" + "path" + "sync" + "time" + + "github.com/wavetermdev/thenextwave/pkg/wavebase" + + "github.com/golang-migrate/migrate/v4" + _ "github.com/golang-migrate/migrate/v4/database/sqlite3" + "github.com/golang-migrate/migrate/v4/source/iofs" + "github.com/jmoiron/sqlx" + _ "github.com/mattn/go-sqlite3" + "github.com/sawka/txwrap" + + dbfs "github.com/wavetermdev/thenextwave/db" +) + +const BlockstoreDbName = "blockstore.db" + +type SingleConnDBGetter struct { + SingleConnLock *sync.Mutex +} + +type TxWrap = txwrap.TxWrap + +var dbWrap *SingleConnDBGetter = &SingleConnDBGetter{SingleConnLock: &sync.Mutex{}} +var globalDBLock = &sync.Mutex{} +var globalDB *sqlx.DB +var globalDBErr error + +func InitBlockstore() error { + err := MigrateBlockstore() + if err != nil { + return err + } + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + _, err = GetDB(ctx) + if err != nil { + return err + } + log.Printf("blockstore initialized\n") + return nil +} + +func GetDBName() string { + scHome := wavebase.GetWaveHomeDir() + return path.Join(scHome, BlockstoreDbName) +} + +func GetDB(ctx context.Context) (*sqlx.DB, error) { + if txwrap.IsTxWrapContext(ctx) { + return nil, fmt.Errorf("cannot call GetDB from within a running transaction") + } + globalDBLock.Lock() + defer globalDBLock.Unlock() + if globalDB == nil && globalDBErr == nil { + dbName := GetDBName() + globalDB, globalDBErr = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName)) + if globalDBErr != nil { + globalDBErr = fmt.Errorf("opening db[%s]: %w", dbName, globalDBErr) + log.Printf("[db] error: %v\n", globalDBErr) + } else { + log.Printf("[db] successfully opened db %s\n", dbName) + } + } + return globalDB, globalDBErr +} + +func (dbg *SingleConnDBGetter) GetDB(ctx context.Context) (*sqlx.DB, error) { + db, err := GetDB(ctx) + if err != nil { + return nil, err + } + dbg.SingleConnLock.Lock() + return db, nil +} + +func (dbg *SingleConnDBGetter) ReleaseDB(db *sqlx.DB) { + dbg.SingleConnLock.Unlock() +} + +func WithTx(ctx context.Context, fn func(tx *TxWrap) error) error { + return txwrap.DBGWithTx(ctx, dbWrap, fn) +} + +func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (RT, error) { + var rtn RT + txErr := WithTx(ctx, func(tx *TxWrap) error { + temp, err := fn(tx) + if err != nil { + return err + } + rtn = temp + return nil + }) + return rtn, txErr +} + +func MakeBlockstoreMigrate() (*migrate.Migrate, error) { + fsVar, err := iofs.New(dbfs.BlockstoreMigrationFS, "migrations-blockstore") + if err != nil { + return nil, fmt.Errorf("opening iofs: %w", err) + } + dbUrl := fmt.Sprintf("sqlite3://%s", GetDBName()) + m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl) + if err != nil { + return nil, fmt.Errorf("making blockstore migration db[%s]: %w", GetDBName(), err) + } + return m, nil +} + +func MigrateBlockstore() error { + log.Printf("migrate blockstore\n") + m, err := MakeBlockstoreMigrate() + if err != nil { + return err + } + curVersion, dirty, err := GetMigrateVersion(m) + if dirty { + return fmt.Errorf("cannot migrate up, database is dirty") + } + if err != nil { + return fmt.Errorf("cannot get current migration version: %v", err) + } + defer m.Close() + err = m.Up() + if err != nil && err != migrate.ErrNoChange { + return fmt.Errorf("migrating blockstore: %w", err) + } + newVersion, _, err := GetMigrateVersion(m) + if err != nil { + return fmt.Errorf("cannot get new migration version: %v", err) + } + if newVersion != curVersion { + log.Printf("[db] blockstore migration done, version %d -> %d\n", curVersion, newVersion) + } + return nil +} + +func GetMigrateVersion(m *migrate.Migrate) (uint, bool, error) { + if m == nil { + var err error + m, err = MakeBlockstoreMigrate() + if err != nil { + return 0, false, err + } + } + curVersion, dirty, err := m.Version() + if err == migrate.ErrNilVersion { + return 0, false, nil + } + return curVersion, dirty, err +} diff --git a/pkg/wavebase/wavebase.go b/pkg/wavebase/wavebase.go new file mode 100644 index 000000000..0b8a8d4ae --- /dev/null +++ b/pkg/wavebase/wavebase.go @@ -0,0 +1,90 @@ +package wavebase + +import ( + "errors" + "fmt" + "io/fs" + "os" + "path" + "strings" + "sync" +) + +const DefaultWaveHome = "~/.w2" +const WaveHomeVarName = "WAVETERM_HOME" +const WaveDevVarName = "WAVETERM_DEV" +const HomeVarName = "HOME" + +var baseLock = &sync.Mutex{} +var ensureDirCache = map[string]bool{} + +func IsDevMode() bool { + pdev := os.Getenv(WaveDevVarName) + return pdev != "" +} + +func GetHomeDir() string { + homeVar := os.Getenv(HomeVarName) + if homeVar == "" { + return "/" + } + return homeVar +} + +func ExpandHomeDir(pathStr string) string { + if pathStr != "~" && !strings.HasPrefix(pathStr, "~/") { + return pathStr + } + homeDir := GetHomeDir() + if pathStr == "~" { + return homeDir + } + return path.Join(homeDir, pathStr[2:]) +} + +func GetWaveHomeDir() string { + homeVar := os.Getenv(WaveHomeVarName) + if homeVar != "" { + return ExpandHomeDir(homeVar) + } + return ExpandHomeDir(DefaultWaveHome) +} + +func EnsureWaveHomeDir() error { + return CacheEnsureDir(GetWaveHomeDir(), "wavehome", 0700, "wave home directory") +} + +func CacheEnsureDir(dirName string, cacheKey string, perm os.FileMode, dirDesc string) error { + baseLock.Lock() + ok := ensureDirCache[cacheKey] + baseLock.Unlock() + if ok { + return nil + } + err := TryMkdirs(dirName, perm, dirDesc) + if err != nil { + return err + } + baseLock.Lock() + ensureDirCache[cacheKey] = true + baseLock.Unlock() + return nil +} + +func TryMkdirs(dirName string, perm os.FileMode, dirDesc string) error { + info, err := os.Stat(dirName) + if errors.Is(err, fs.ErrNotExist) { + err = os.MkdirAll(dirName, perm) + if err != nil { + return fmt.Errorf("cannot make %s %q: %w", dirDesc, dirName, err) + } + info, err = os.Stat(dirName) + } + if err != nil { + return fmt.Errorf("error trying to stat %s: %w", dirDesc, err) + } + if !info.IsDir() { + return fmt.Errorf("%s %q must be a directory", dirDesc, dirName) + } + return nil +} From 4da5a4f61033a793cef4acadd0b17f708427354b Mon Sep 17 00:00:00 2001 From: sawka Date: Sun, 12 May 2024 21:59:42 -0700 Subject: [PATCH 02/10] reimplement blockstore, needs testing --- go.mod | 1 + go.sum | 2 + pkg/blockstore/blockstore.go | 422 +++++++++++++++++++++++++++-- pkg/blockstore/blockstore_cache.go | 194 +++++++++++++ pkg/blockstore/blockstore_dbops.go | 122 +++++++++ 5 files changed, 714 insertions(+), 27 deletions(-) create mode 100644 pkg/blockstore/blockstore_cache.go create mode 100644 pkg/blockstore/blockstore_dbops.go diff --git a/go.mod b/go.mod index 757ffca7d..30811ede0 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.22 github.com/sawka/txwrap v0.1.2 github.com/wailsapp/wails/v3 v3.0.0-alpha.0 + github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94 ) require ( diff --git a/go.sum b/go.sum index 0ff590522..2e5c12bf6 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,8 @@ github.com/wailsapp/go-webview2 v1.0.9 h1:lrU+q0cf1wgLdR69rN+ZnRtMJNaJRrcQ4ELxoO github.com/wailsapp/go-webview2 v1.0.9/go.mod h1:Uk2BePfCRzttBBjFrBmqKGJd41P6QIHeV9kTgIeOZNo= github.com/wailsapp/mimetype v1.4.1 h1:pQN9ycO7uo4vsUUuPeHEYoUkLVkaRntMnHJxVwYhwHs= github.com/wailsapp/mimetype v1.4.1/go.mod h1:9aV5k31bBOv5z6u+QP8TltzvNGJPmNJD4XlAL3U+j3o= +github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94 h1:/SPCxd4KHlS4eRTreYEXWFRr8WfRFBcChlV5cgkaO58= +github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94/go.mod h1:ywoo7DXdYueQ0tTPhVoB+wzRTgERSE19EA3mR6KGRaI= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 39b8455a4..e25b084ee 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -3,42 +3,34 @@ package blockstore +// the blockstore package implements a write cache for block files +// it is not a read cache (reads still go to the DB -- unless items are in the cache) +// but all writes only go to the cache, and then the cache is periodically flushed to the DB + import ( - "database/sql/driver" - "encoding/json" + "context" + "fmt" + "sync" + "time" ) +const PartDataSize = 64 * 1024 +const DefaultFlushTime = 5 * time.Second +const NoPartIdx = -1 + +var GlobalBlockStore *BlockStore = &BlockStore{ + Lock: &sync.Mutex{}, + Cache: make(map[cacheKey]*CacheEntry), + FlushTime: DefaultFlushTime, +} + type FileOptsType struct { MaxSize int64 Circular bool IJson bool } -func (f *FileOptsType) Scan(value interface{}) error { - return json.Unmarshal(value.([]byte), f) -} - -func (f FileOptsType) Value() (driver.Value, error) { - barr, err := json.Marshal(f) - if err != nil { - return nil, err - } - return string(barr), nil -} - -type FileMeta map[string]any - -func (m *FileMeta) Scan(value interface{}) error { - return json.Unmarshal(value.([]byte), m) -} - -func (m FileMeta) Value() (driver.Value, error) { - barr, err := json.Marshal(m) - if err != nil { - return nil, err - } - return string(barr), nil -} +type FileMeta = map[string]any type BlockFile struct { BlockId string `json:"blockid"` @@ -49,3 +41,379 @@ type BlockFile struct { Opts FileOptsType `json:"opts"` Meta FileMeta `json:"meta"` } + +func copyMeta(meta FileMeta) FileMeta { + newMeta := make(FileMeta) + for k, v := range meta { + newMeta[k] = v + } + return newMeta +} + +func (f *BlockFile) DeepCopy() *BlockFile { + if f == nil { + return nil + } + newFile := *f + newFile.Meta = copyMeta(f.Meta) + return &newFile +} + +func (BlockFile) UseDBMap() {} + +type BlockData struct { + BlockId string `json:"blockid"` + Name string `json:"name"` + PartIdx int `json:"partidx"` + Data []byte `json:"data"` +} + +func (BlockData) UseDBMap() {} + +// synchronous (does not interact with the cache) +func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error { + if opts.MaxSize < 0 { + return fmt.Errorf("max size must be non-negative") + } + if opts.Circular && opts.MaxSize <= 0 { + return fmt.Errorf("circular file must have a max size") + } + if opts.Circular && opts.IJson { + return fmt.Errorf("circular file cannot be ijson") + } + now := time.Now().UnixMilli() + file := &BlockFile{ + BlockId: blockId, + Name: name, + Size: 0, + CreatedTs: now, + ModTs: now, + Opts: opts, + Meta: meta, + } + return dbInsertFile(ctx, file) +} + +func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error { + err := dbDeleteFile(ctx, blockId, name) + if err != nil { + return fmt.Errorf("error deleting file: %v", err) + } + s.withLock(blockId, name, false, func(entry *CacheEntry) { + if entry == nil { + return + } + entry.Deleted = true + }) + return nil +} + +func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error { + fileNames, err := dbGetBlockFileNames(ctx, blockId) + if err != nil { + return fmt.Errorf("error getting block files: %v", err) + } + for _, name := range fileNames { + s.DeleteFile(ctx, blockId, name) + } + return nil +} + +func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) { + file, ok := s.getFileFromCache(blockId, name) + if ok { + return file, nil + } + return dbGetFile(ctx, blockId, name) +} + +func stripNils[T any](arr []*T) []*T { + newArr := make([]*T, 0) + for _, item := range arr { + if item == nil { + continue + } + newArr = append(newArr, item) + } + return newArr +} + +func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { + files, err := dbGetBlockFiles(ctx, blockId) + if err != nil { + return nil, fmt.Errorf("error getting block files: %v", err) + } + // now we wash the files through the cache + var hasNils bool + for idx, dbFile := range files { + cacheFile, ok := s.getFileFromCache(dbFile.BlockId, dbFile.Name) + if ok { + if cacheFile == nil { + hasNils = true + } + files[idx] = cacheFile + } + } + if hasNils { + files = stripNils(files) + } + return files, nil +} + +func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta) error { + file, ok := s.getFileFromCache(blockId, name) + if !ok { + dbFile, err := dbGetFile(ctx, blockId, name) + if err != nil { + return fmt.Errorf("error getting file: %v", err) + } + file = dbFile + } + if file == nil { + return fmt.Errorf("file not found") + } + var rtnErr error + s.withLock(blockId, name, true, func(entry *CacheEntry) { + if entry.Deleted { + rtnErr = fmt.Errorf("file is deleted") + return + } + newFileEntry := entry.copyOrCreateFileEntry(file) + newFileEntry.File.Meta = meta + entry.FileEntry = newFileEntry + entry.FileEntry.File.ModTs = time.Now().UnixMilli() + entry.Version++ + }) + return rtnErr +} + +func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name string) (*BlockFile, error) { + file, ok := s.getFileFromCache(blockId, name) + if ok { + if file == nil { + return nil, fmt.Errorf("file not found") + } + return file, nil + } + dbFile, err := dbGetFile(ctx, blockId, name) + if err != nil { + return nil, fmt.Errorf("error getting file: %v", err) + } + if dbFile == nil { + return nil, fmt.Errorf("file not found") + } + var rtnErr error + rtnFile := dbFile + s.withLock(blockId, name, true, func(entry *CacheEntry) { + if entry.Deleted { + rtnFile = nil + rtnErr = fmt.Errorf("file is deleted") + return + } + if entry.FileEntry != nil { + // someone beat us to it + rtnFile = entry.FileEntry.File.DeepCopy() + return + } + entry.FileEntry = entry.copyOrCreateFileEntry(dbFile) + // returns dbFile, nil + }) + return rtnFile, rtnErr +} + +func (f *BlockFile) getLastIncompletePartNum() int { + if f.Size%PartDataSize == 0 { + return NoPartIdx + } + return f.partIdxAtOffset(f.Size) +} + +func (f *BlockFile) partIdxAtOffset(offset int64) int { + partIdx := int(offset / PartDataSize) + if f.Opts.Circular { + maxPart := int(f.Opts.MaxSize / PartDataSize) + partIdx = partIdx % maxPart + } + return partIdx +} + +// blockfile must be loaded +func (s *BlockStore) loadLastDataBlock(ctx context.Context, blockId string, name string) error { + var partIdx int + err := s.withLockExists(blockId, name, func(entry *CacheEntry) error { + partIdx = entry.FileEntry.File.getLastIncompletePartNum() + return nil + }) + if err != nil { + return err + } + if partIdx == NoPartIdx { + return nil + } + return s.loadDataParts(ctx, blockId, name, []int{partIdx}) +} + +func maxOfIntArr(arr []int) int { + if len(arr) == 0 { + return 0 + } + max := arr[0] + for _, v := range arr[1:] { + if v > max { + max = v + } + } + return max +} + +func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name string, parts []int) error { + partDataMap, err := dbGetFileParts(ctx, blockId, name, parts) + if err != nil { + return fmt.Errorf("error getting file part: %v", err) + } + maxPart := maxOfIntArr(parts) + return s.withLockExists(blockId, name, func(entry *CacheEntry) error { + entry.ensurePart(maxPart, false) + for partIdx, partData := range partDataMap { + if entry.DataEntries[partIdx] != nil { + // someone beat us to it + continue + } + entry.DataEntries[partIdx] = partData + } + return nil + }) +} + +func (s *BlockStore) writeAt_nolock(entry *CacheEntry, offset int64, data []byte) { + endWrite := entry.FileEntry.File.Size + int64(len(data)) + entry.writeAt(offset, data) + if endWrite > entry.FileEntry.File.Size { + entry.FileEntry.File.Size = endWrite + } + entry.FileEntry.File.ModTs = time.Now().UnixMilli() + entry.Version++ +} + +func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) error { + return s.withLockExists(blockId, name, func(entry *CacheEntry) error { + s.writeAt_nolock(entry, entry.FileEntry.File.Size, data) + return nil + }) +} + +func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error { + s.pinCacheEntry(blockId, name) + defer s.unpinCacheEntry(blockId, name) + _, err := s.loadFileInfo(ctx, blockId, name) + if err != nil { + return fmt.Errorf("error loading file info: %v", err) + } + err = s.loadLastDataBlock(ctx, blockId, name) + if err != nil { + return fmt.Errorf("error loading last data block: %v", err) + } + err = s.appendDataToCache(blockId, name, data) + if err != nil { + return fmt.Errorf("error appending data: %v", err) + } + return nil +} + +func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) { + return dbGetAllBlockIds(ctx) +} + +func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error { + s.pinCacheEntry(blockId, name) + defer s.unpinCacheEntry(blockId, name) + file, err := s.loadFileInfo(ctx, blockId, name) + if err != nil { + return fmt.Errorf("error loading file info: %v", err) + } + startWriteIdx := offset + endWriteIdx := offset + int64(len(data)) + startPartIdx := file.partIdxAtOffset(startWriteIdx) + endPartIdx := file.partIdxAtOffset(endWriteIdx) + err = s.loadDataParts(ctx, blockId, name, []int{startPartIdx, endPartIdx}) + if err != nil { + return fmt.Errorf("error loading data parts: %v", err) + } + return s.withLockExists(blockId, name, func(entry *CacheEntry) error { + s.writeAt_nolock(entry, offset, data) + return nil + }) +} + +// returns (offset, data, error) +// we return the offset because the offset may have been adjusted if the size was too big (for circular files) +func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (int64, []byte, error) { + s.pinCacheEntry(blockId, name) + defer s.unpinCacheEntry(blockId, name) + file, err := s.Stat(ctx, blockId, name) + if err != nil { + return 0, nil, fmt.Errorf("error getting file: %v", err) + } + if file.Opts.Circular { + // we can do this check here because MaxSize for file cannot be modified + if size > file.Opts.MaxSize { + // just read the last maxsize bytes + sizeTooBig := size - file.Opts.MaxSize + offset += sizeTooBig + } + } + var partsNeeded []int + lastPartOffset := (offset + size) % PartDataSize + endOffsetOfLastPart := offset + size - lastPartOffset + PartDataSize + for i := offset; i < endOffsetOfLastPart; i += PartDataSize { + partsNeeded = append(partsNeeded, file.partIdxAtOffset(i)) + } + dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded) + if err != nil { + return 0, nil, fmt.Errorf("error loading data parts: %v", err) + } + // wash the entries through the cache + err = s.withLockExists(blockId, name, func(entry *CacheEntry) error { + if offset+size > entry.FileEntry.File.Size { + // limit read to the actual size of the file + size = entry.FileEntry.File.Size - offset + } + for partIdx, _ := range dataEntries { + if entry.DataEntries[partIdx] != nil { + dataEntries[partIdx] = entry.DataEntries[partIdx] + } + } + return nil + }) + if err != nil { + return 0, nil, fmt.Errorf("error reconciling cache entries: %v", err) + } + // combine the entries into a single byte slice + // note that we only want part of the first and last part depending on offset and size + var rtn []byte + amtLeftToRead := size + curReadOffset := offset + for amtLeftToRead > 0 { + partIdx := file.partIdxAtOffset(curReadOffset) + partDataEntry := dataEntries[partIdx] + var partData []byte + if partDataEntry == nil { + partData = make([]byte, PartDataSize) + } else { + partData = partDataEntry.Data[0:PartDataSize] + } + partOffset := curReadOffset % PartDataSize + amtToRead := minInt64(PartDataSize-partOffset, amtLeftToRead) + rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...) + amtLeftToRead -= amtToRead + curReadOffset += amtToRead + } + return offset, rtn, nil +} + +func minInt64(a, b int64) int64 { + if a < b { + return a + } + return b +} diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go new file mode 100644 index 000000000..ab88eacef --- /dev/null +++ b/pkg/blockstore/blockstore_cache.go @@ -0,0 +1,194 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package blockstore + +import ( + "fmt" + "sync" + "sync/atomic" + "time" +) + +type cacheKey struct { + BlockId string + Name string +} + +type DataCacheEntry struct { + Dirty *atomic.Bool + PartIdx int + Data []byte // capacity is always BlockDataPartSize +} + +type FileCacheEntry struct { + Dirty *atomic.Bool + File BlockFile +} + +// invariants: +// - we only modify CacheEntry fields when we are holding the BlockStore lock +// - FileEntry can be nil, if pinned +// - FileEntry.File is never updated in place, the entire FileEntry is replaced +// - DataCacheEntry items are never updated in place, the entire DataCacheEntry is replaced +// - when pinned, the cache entry is never removed +// this allows us to flush the cache entry to disk without holding the lock +type CacheEntry struct { + BlockId string + Name string + Version int + PinCount int + Deleted bool + FileEntry *FileCacheEntry + DataEntries []*DataCacheEntry +} + +func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry { + for len(e.DataEntries) <= partIdx { + e.DataEntries = append(e.DataEntries, nil) + } + if create && e.DataEntries[partIdx] == nil { + e.DataEntries[partIdx] = &DataCacheEntry{ + PartIdx: partIdx, + Data: make([]byte, 0, PartDataSize), + Dirty: &atomic.Bool{}, + } + } + return e.DataEntries[partIdx] +} + +func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 { + leftInPart := PartDataSize - offset + toWrite := int64(len(data)) + if toWrite > leftInPart { + toWrite = leftInPart + } + if int64(len(dce.Data)) < offset+toWrite { + dce.Data = dce.Data[:offset+toWrite] + } + copy(dce.Data[offset:], data[:toWrite]) + dce.Dirty.Store(true) + return toWrite +} + +func (entry *CacheEntry) writeAt(offset int64, data []byte) { + for len(data) > 0 { + partIdx := int(offset / PartDataSize) + if entry.FileEntry.File.Opts.Circular { + maxPart := int(entry.FileEntry.File.Opts.MaxSize / PartDataSize) + partIdx = partIdx % maxPart + } + partOffset := offset % PartDataSize + partData := entry.ensurePart(partIdx, true) + nw := partData.writeToPart(partOffset, data) + data = data[nw:] + offset += nw + } +} + +type BlockStore struct { + Lock *sync.Mutex + Cache map[cacheKey]*CacheEntry + FlushTime time.Duration +} + +func (s *BlockStore) withLock(blockId string, name string, shouldCreate bool, f func(*CacheEntry)) { + s.Lock.Lock() + defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + if shouldCreate { + entry = &CacheEntry{ + BlockId: blockId, + Name: name, + PinCount: 0, + FileEntry: nil, + DataEntries: nil, + } + s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry + } + } + f(entry) +} + +func (s *BlockStore) withLockExists(blockId string, name string, f func(*CacheEntry) error) error { + s.Lock.Lock() + defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil || entry.Deleted || entry.FileEntry == nil { + return fmt.Errorf("file not found") + } + return f(entry) +} + +func (s *BlockStore) pinCacheEntry(blockId string, name string) { + s.Lock.Lock() + defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + entry = &CacheEntry{ + BlockId: blockId, + Name: name, + PinCount: 0, + FileEntry: nil, + DataEntries: nil, + } + s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry + } + entry.PinCount++ +} + +func (s *BlockStore) unpinCacheEntry(blockId string, name string) { + s.Lock.Lock() + defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + // this is not good + return + } + entry.PinCount-- +} + +func (s *BlockStore) tryDeleteCacheEntry(blockId string, name string) { + s.Lock.Lock() + defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + return + } + if entry.PinCount > 0 { + return + } + delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) +} + +// getFileFromCache returns the file from the cache if it exists +// return (file, cached) +func (s *BlockStore) getFileFromCache(blockId string, name string) (*BlockFile, bool) { + s.Lock.Lock() + defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + return nil, false + } + if entry.Deleted { + return nil, true + } + if entry.FileEntry == nil { + return nil, false + } + return entry.FileEntry.File.DeepCopy(), true +} + +func (e *CacheEntry) copyOrCreateFileEntry(dbFile *BlockFile) *FileCacheEntry { + if e.FileEntry == nil { + return &FileCacheEntry{ + Dirty: &atomic.Bool{}, + File: *dbFile, + } + } + return &FileCacheEntry{ + Dirty: &atomic.Bool{}, + File: *e.FileEntry.File.DeepCopy(), + } +} diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go new file mode 100644 index 000000000..583737ef7 --- /dev/null +++ b/pkg/blockstore/blockstore_dbops.go @@ -0,0 +1,122 @@ +package blockstore + +import ( + "context" + "fmt" + "sync/atomic" + + "github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil" +) + +func dbInsertFile(ctx context.Context, file *BlockFile) error { + // will fail if file already exists + return WithTx(ctx, func(tx *TxWrap) error { + query := "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)" + tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, file.Opts, file.Meta) + return nil + }) +} + +func dbDeleteFile(ctx context.Context, blockId string, name string) error { + return WithTx(ctx, func(tx *TxWrap) error { + query := "DELETE FROM db_block_file WHERE blockid = ? AND name = ?" + tx.Exec(query, blockId, name) + query = "DELETE FROM db_block_data WHERE blockid = ? AND name = ?" + tx.Exec(query, blockId, name) + return nil + }) +} + +func dbGetBlockFileNames(ctx context.Context, blockId string) ([]string, error) { + return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) { + var files []string + query := "SELECT name FROM db_block_file WHERE blockid = ?" + tx.Select(&files, query, blockId) + return files, nil + }) +} + +func dbDeleteBlock(ctx context.Context, blockId string) error { + return WithTx(ctx, func(tx *TxWrap) error { + query := "DELETE FROM db_block_file WHERE blockid = ?" + tx.Exec(query, blockId) + query = "DELETE FROM db_block_data WHERE blockid = ?" + tx.Exec(query, blockId) + return nil + }) +} + +func dbGetFile(ctx context.Context, blockId string, name string) (*BlockFile, error) { + return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) { + var file BlockFile + query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?" + tx.Get(&file, query, blockId, name) + return &file, nil + }) +} + +func dbGetAllBlockIds(ctx context.Context) ([]string, error) { + return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) { + var ids []string + query := "SELECT DISTINCT blockid FROM db_block_file" + tx.Select(&ids, query) + return ids, nil + }) +} + +func dbGetFileParts(ctx context.Context, blockId string, name string, parts []int) (map[int]*DataCacheEntry, error) { + return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) { + var data []*DataCacheEntry + query := "SELECT partidx, data FROM db_block_data WHERE blockid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))" + tx.Select(&data, query, blockId, name, dbutil.QuickJsonArr(parts)) + rtn := make(map[int]*DataCacheEntry) + for _, d := range data { + d.Dirty = &atomic.Bool{} + rtn[d.PartIdx] = d + } + return rtn, nil + }) +} + +func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { + return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) { + var files []*BlockFile + query := "SELECT * FROM db_block_file WHERE blockid = ?" + tx.Select(&files, query, blockId) + return files, nil + }) +} + +func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntries []*DataCacheEntry) error { + if fileEntry == nil { + return fmt.Errorf("fileEntry or fileEntry.File is nil") + } + return WithTx(ctx, func(tx *TxWrap) error { + query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?` + if !tx.Exists(query, fileEntry.File.BlockId, fileEntry.File.Name) { + // since deletion is synchronous this stops us from writing to a deleted file + return fmt.Errorf("file not found in db") + } + if fileEntry.Dirty.Load() { + query := `UPDATE db_block_file SET size = ?, createdts = ?, modts = ?, opts = ?, meta = ? WHERE blockid = ? AND name = ?` + tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, fileEntry.File.Opts, fileEntry.File.Meta, fileEntry.File.BlockId, fileEntry.File.Name) + } + dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)` + for _, dataEntry := range dataEntries { + if dataEntry == nil || !dataEntry.Dirty.Load() { + continue + } + tx.Exec(dataPartQuery, fileEntry.File.BlockId, fileEntry.File.Name, dataEntry.PartIdx, dataEntry.Data) + } + if tx.Err == nil { + // clear dirty flags + fileEntry.Dirty.Store(false) + for _, dataEntry := range dataEntries { + if dataEntry != nil { + dataEntry.Dirty.Store(false) + } + } + } + return nil + }) +} From 68a6605209ed90e0959e1443e8f6b43f1e6473d5 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 13 May 2024 00:02:32 -0700 Subject: [PATCH 03/10] enable testing, bug fixes --- db/migrations-blockstore/000001_init.up.sql | 3 +- pkg/blockstore/blockstore.go | 37 ++++++---- pkg/blockstore/blockstore_cache.go | 10 +-- pkg/blockstore/blockstore_dbops.go | 14 ++-- pkg/blockstore/blockstore_test.go | 79 +++++++++++++++++++++ pkg/blockstore/dbsetup.go | 34 +++++++-- 6 files changed, 141 insertions(+), 36 deletions(-) create mode 100644 pkg/blockstore/blockstore_test.go diff --git a/db/migrations-blockstore/000001_init.up.sql b/db/migrations-blockstore/000001_init.up.sql index dd4b70a0e..fce5a6986 100644 --- a/db/migrations-blockstore/000001_init.up.sql +++ b/db/migrations-blockstore/000001_init.up.sql @@ -1,11 +1,10 @@ CREATE TABLE db_block_file ( blockid varchar(36) NOT NULL, name varchar(200) NOT NULL, - maxsize bigint NOT NULL, - circular boolean NOT NULL, size bigint NOT NULL, createdts bigint NOT NULL, modts bigint NOT NULL, + opts json NOT NULL, meta json NOT NULL, PRIMARY KEY (blockid, name) ); diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index e25b084ee..95514f4a8 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -14,11 +14,13 @@ import ( "time" ) -const PartDataSize = 64 * 1024 +const DefaultPartDataSize = 64 * 1024 const DefaultFlushTime = 5 * time.Second const NoPartIdx = -1 -var GlobalBlockStore *BlockStore = &BlockStore{ +var partDataSize int64 = DefaultPartDataSize // overridden in tests + +var GBS *BlockStore = &BlockStore{ Lock: &sync.Mutex{}, Cache: make(map[cacheKey]*CacheEntry), FlushTime: DefaultFlushTime, @@ -81,6 +83,11 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, if opts.Circular && opts.IJson { return fmt.Errorf("circular file cannot be ijson") } + if opts.Circular { + if opts.MaxSize%partDataSize != 0 { + opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize + } + } now := time.Now().UnixMilli() file := &BlockFile{ BlockId: blockId, @@ -124,7 +131,7 @@ func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*Bl if ok { return file, nil } - return dbGetFile(ctx, blockId, name) + return dbGetBlockFile(ctx, blockId, name) } func stripNils[T any](arr []*T) []*T { @@ -163,7 +170,7 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta) error { file, ok := s.getFileFromCache(blockId, name) if !ok { - dbFile, err := dbGetFile(ctx, blockId, name) + dbFile, err := dbGetBlockFile(ctx, blockId, name) if err != nil { return fmt.Errorf("error getting file: %v", err) } @@ -195,7 +202,7 @@ func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name stri } return file, nil } - dbFile, err := dbGetFile(ctx, blockId, name) + dbFile, err := dbGetBlockFile(ctx, blockId, name) if err != nil { return nil, fmt.Errorf("error getting file: %v", err) } @@ -222,16 +229,16 @@ func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name stri } func (f *BlockFile) getLastIncompletePartNum() int { - if f.Size%PartDataSize == 0 { + if f.Size%partDataSize == 0 { return NoPartIdx } return f.partIdxAtOffset(f.Size) } func (f *BlockFile) partIdxAtOffset(offset int64) int { - partIdx := int(offset / PartDataSize) + partIdx := int(offset / partDataSize) if f.Opts.Circular { - maxPart := int(f.Opts.MaxSize / PartDataSize) + maxPart := int(f.Opts.MaxSize / partDataSize) partIdx = partIdx % maxPart } return partIdx @@ -363,9 +370,9 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of } } var partsNeeded []int - lastPartOffset := (offset + size) % PartDataSize - endOffsetOfLastPart := offset + size - lastPartOffset + PartDataSize - for i := offset; i < endOffsetOfLastPart; i += PartDataSize { + lastPartOffset := (offset + size) % partDataSize + endOffsetOfLastPart := offset + size - lastPartOffset + partDataSize + for i := offset; i < endOffsetOfLastPart; i += partDataSize { partsNeeded = append(partsNeeded, file.partIdxAtOffset(i)) } dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded) @@ -398,12 +405,12 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of partDataEntry := dataEntries[partIdx] var partData []byte if partDataEntry == nil { - partData = make([]byte, PartDataSize) + partData = make([]byte, partDataSize) } else { - partData = partDataEntry.Data[0:PartDataSize] + partData = partDataEntry.Data[0:partDataSize] } - partOffset := curReadOffset % PartDataSize - amtToRead := minInt64(PartDataSize-partOffset, amtLeftToRead) + partOffset := curReadOffset % partDataSize + amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead) rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...) amtLeftToRead -= amtToRead curReadOffset += amtToRead diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index ab88eacef..dc6ba26bc 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -50,7 +50,7 @@ func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry { if create && e.DataEntries[partIdx] == nil { e.DataEntries[partIdx] = &DataCacheEntry{ PartIdx: partIdx, - Data: make([]byte, 0, PartDataSize), + Data: make([]byte, 0, partDataSize), Dirty: &atomic.Bool{}, } } @@ -58,7 +58,7 @@ func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry { } func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 { - leftInPart := PartDataSize - offset + leftInPart := partDataSize - offset toWrite := int64(len(data)) if toWrite > leftInPart { toWrite = leftInPart @@ -73,12 +73,12 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 { func (entry *CacheEntry) writeAt(offset int64, data []byte) { for len(data) > 0 { - partIdx := int(offset / PartDataSize) + partIdx := int(offset / partDataSize) if entry.FileEntry.File.Opts.Circular { - maxPart := int(entry.FileEntry.File.Opts.MaxSize / PartDataSize) + maxPart := int(entry.FileEntry.File.Opts.MaxSize / partDataSize) partIdx = partIdx % maxPart } - partOffset := offset % PartDataSize + partOffset := offset % partDataSize partData := entry.ensurePart(partIdx, true) nw := partData.writeToPart(partOffset, data) data = data[nw:] diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go index 583737ef7..03d338e7d 100644 --- a/pkg/blockstore/blockstore_dbops.go +++ b/pkg/blockstore/blockstore_dbops.go @@ -12,7 +12,7 @@ func dbInsertFile(ctx context.Context, file *BlockFile) error { // will fail if file already exists return WithTx(ctx, func(tx *TxWrap) error { query := "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)" - tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, file.Opts, file.Meta) + tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta)) return nil }) } @@ -46,12 +46,11 @@ func dbDeleteBlock(ctx context.Context, blockId string) error { }) } -func dbGetFile(ctx context.Context, blockId string, name string) (*BlockFile, error) { +func dbGetBlockFile(ctx context.Context, blockId string, name string) (*BlockFile, error) { return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) { - var file BlockFile query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?" - tx.Get(&file, query, blockId, name) - return &file, nil + file := dbutil.GetMappable[*BlockFile](tx, query, blockId, name) + return file, nil }) } @@ -80,9 +79,8 @@ func dbGetFileParts(ctx context.Context, blockId string, name string, parts []in func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) { - var files []*BlockFile query := "SELECT * FROM db_block_file WHERE blockid = ?" - tx.Select(&files, query, blockId) + files := dbutil.SelectMappable[*BlockFile](tx, query, blockId) return files, nil }) } @@ -99,7 +97,7 @@ func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntri } if fileEntry.Dirty.Load() { query := `UPDATE db_block_file SET size = ?, createdts = ?, modts = ?, opts = ?, meta = ? WHERE blockid = ? AND name = ?` - tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, fileEntry.File.Opts, fileEntry.File.Meta, fileEntry.File.BlockId, fileEntry.File.Name) + tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, dbutil.QuickJson(fileEntry.File.Opts), dbutil.QuickJson(fileEntry.File.Meta), fileEntry.File.BlockId, fileEntry.File.Name) } dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)` for _, dataEntry := range dataEntries { diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go new file mode 100644 index 000000000..2fad077f1 --- /dev/null +++ b/pkg/blockstore/blockstore_test.go @@ -0,0 +1,79 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package blockstore + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" +) + +func initDb(t *testing.T) { + t.Logf("initializing db for %q", t.Name()) + useTestingDb = true + partDataSize = 64 + err := MigrateBlockstore(false) + if err != nil { + t.Fatalf("error migrating blockstore: %v", err) + } +} + +func cleanupDb(t *testing.T) { + t.Logf("cleaning up db for %q", t.Name()) + globalDBLock.Lock() + defer globalDBLock.Unlock() + if globalDB != nil { + globalDB.Close() + globalDB = nil + } + globalDBErr = nil + useTestingDb = false + partDataSize = DefaultPartDataSize +} + +func TestCreate(t *testing.T) { + initDb(t) + defer cleanupDb(t) + + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + file, err := GBS.Stat(ctx, blockId, "testfile") + if err != nil { + t.Fatalf("error stating file: %v", err) + } + if file == nil { + t.Fatalf("file not found") + } + if file.BlockId != blockId { + t.Fatalf("block id mismatch") + } + if file.Name != "testfile" { + t.Fatalf("name mismatch") + } + if file.Size != 0 { + t.Fatalf("size mismatch") + } + if file.CreatedTs == 0 { + t.Fatalf("created ts zero") + } + if file.ModTs == 0 { + t.Fatalf("mod ts zero") + } + if file.CreatedTs != file.ModTs { + t.Fatalf("create ts != mod ts") + } + if len(file.Meta) != 0 { + t.Fatalf("meta should have no values") + } + if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 { + t.Fatalf("opts not empty") + } +} diff --git a/pkg/blockstore/dbsetup.go b/pkg/blockstore/dbsetup.go index 500158d7e..49f7f9019 100644 --- a/pkg/blockstore/dbsetup.go +++ b/pkg/blockstore/dbsetup.go @@ -14,7 +14,7 @@ import ( "github.com/wavetermdev/thenextwave/pkg/wavebase" "github.com/golang-migrate/migrate/v4" - _ "github.com/golang-migrate/migrate/v4/database/sqlite3" + sqlite3migrate "github.com/golang-migrate/migrate/v4/database/sqlite3" "github.com/golang-migrate/migrate/v4/source/iofs" "github.com/jmoiron/sqlx" _ "github.com/mattn/go-sqlite3" @@ -35,9 +35,10 @@ var dbWrap *SingleConnDBGetter = &SingleConnDBGetter{SingleConnLock: &sync.Mutex var globalDBLock = &sync.Mutex{} var globalDB *sqlx.DB var globalDBErr error +var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db) func InitBlockstore() error { - err := MigrateBlockstore() + err := MigrateBlockstore(false) if err != nil { return err } @@ -63,6 +64,16 @@ func GetDB(ctx context.Context) (*sqlx.DB, error) { globalDBLock.Lock() defer globalDBLock.Unlock() if globalDB == nil && globalDBErr == nil { + if useTestingDb { + dbName := ":memory:" + globalDB, globalDBErr = sqlx.Open("sqlite3", dbName) + if globalDBErr != nil { + log.Printf("[db] in-memory db err: %v\n", globalDBErr) + } else { + log.Printf("[db] using in-memory db\n") + } + return globalDB, globalDBErr + } dbName := GetDBName() globalDB, globalDBErr = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName)) if globalDBErr != nil { @@ -110,15 +121,24 @@ func MakeBlockstoreMigrate() (*migrate.Migrate, error) { if err != nil { return nil, fmt.Errorf("opening iofs: %w", err) } - dbUrl := fmt.Sprintf("sqlite3://%s", GetDBName()) - m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl) + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + db, err := GetDB(ctx) + if err != nil { + return nil, err + } + mdriver, err := sqlite3migrate.WithInstance(db.DB, &sqlite3migrate.Config{}) + if err != nil { + return nil, fmt.Errorf("making blockstore migration driver: %w", err) + } + m, err := migrate.NewWithInstance("iofs", fsVar, "sqlite3", mdriver) if err != nil { return nil, fmt.Errorf("making blockstore migration db[%s]: %w", GetDBName(), err) } return m, nil } -func MigrateBlockstore() error { +func MigrateBlockstore(shouldClose bool) error { log.Printf("migrate blockstore\n") m, err := MakeBlockstoreMigrate() if err != nil { @@ -131,7 +151,9 @@ func MigrateBlockstore() error { if err != nil { return fmt.Errorf("cannot get current migration version: %v", err) } - defer m.Close() + if shouldClose { + defer m.Close() + } err = m.Up() if err != nil && err != migrate.ErrNoChange { return fmt.Errorf("migrating blockstore: %w", err) From 9bb6e2720175eb1044eafa1d9d822ef3deed9903 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 13 May 2024 00:08:50 -0700 Subject: [PATCH 04/10] test writemeta --- pkg/blockstore/blockstore_cache.go | 14 ++++++++++ pkg/blockstore/blockstore_test.go | 43 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index dc6ba26bc..8b72fa5a8 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -43,6 +43,20 @@ type CacheEntry struct { DataEntries []*DataCacheEntry } +// for testing +func (s *BlockStore) getCacheSize() int { + s.Lock.Lock() + defer s.Lock.Unlock() + return len(s.Cache) +} + +// for testing +func (s *BlockStore) clearCache() { + s.Lock.Lock() + defer s.Lock.Unlock() + s.Cache = make(map[cacheKey]*CacheEntry) +} + func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry { for len(e.DataEntries) <= partIdx { e.DataEntries = append(e.DataEntries, nil) diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 2fad077f1..4f4dec6a8 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -32,6 +32,7 @@ func cleanupDb(t *testing.T) { globalDBErr = nil useTestingDb = false partDataSize = DefaultPartDataSize + GBS.clearCache() } func TestCreate(t *testing.T) { @@ -77,3 +78,45 @@ func TestCreate(t *testing.T) { t.Fatalf("opts not empty") } } + +func checkMapsEqual(t *testing.T, m1 map[string]any, m2 map[string]any, msg string) { + if len(m1) != len(m2) { + t.Errorf("%s: map length mismatch", msg) + } + for k, v := range m1 { + if m2[k] != v { + t.Errorf("%s: value mismatch for key %q", msg, k) + } + } +} + +func TestSetMeta(t *testing.T) { + initDb(t) + defer cleanupDb(t) + + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + if GBS.getCacheSize() != 0 { + t.Errorf("cache size mismatch -- should have 0 entries after create") + } + err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 5, "b": "hello"}) + if err != nil { + t.Fatalf("error setting meta: %v", err) + } + file, err := GBS.Stat(ctx, blockId, "testfile") + if err != nil { + t.Fatalf("error stating file: %v", err) + } + if file == nil { + t.Fatalf("file not found") + } + checkMapsEqual(t, map[string]any{"a": 5, "b": "hello"}, file.Meta, "meta") + if GBS.getCacheSize() != 1 { + t.Errorf("cache size mismatch") + } +} From b0762f5ce16e33efcd1e95782a68421bf3d2ae75 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 13 May 2024 00:12:55 -0700 Subject: [PATCH 05/10] allow meta merging --- pkg/blockstore/blockstore.go | 14 ++++++++++++-- pkg/blockstore/blockstore_test.go | 16 ++++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 95514f4a8..36cde7e47 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -167,7 +167,7 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil return files, nil } -func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta) error { +func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error { file, ok := s.getFileFromCache(blockId, name) if !ok { dbFile, err := dbGetBlockFile(ctx, blockId, name) @@ -186,7 +186,17 @@ func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, return } newFileEntry := entry.copyOrCreateFileEntry(file) - newFileEntry.File.Meta = meta + if merge { + for k, v := range meta { + if v == nil { + delete(newFileEntry.File.Meta, k) + continue + } + newFileEntry.File.Meta[k] = v + } + } else { + newFileEntry.File.Meta = meta + } entry.FileEntry = newFileEntry entry.FileEntry.File.ModTs = time.Now().UnixMilli() entry.Version++ diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 4f4dec6a8..d963dd8cd 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -104,7 +104,7 @@ func TestSetMeta(t *testing.T) { if GBS.getCacheSize() != 0 { t.Errorf("cache size mismatch -- should have 0 entries after create") } - err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 5, "b": "hello"}) + err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 5, "b": "hello", "q": 8}, false) if err != nil { t.Fatalf("error setting meta: %v", err) } @@ -115,8 +115,20 @@ func TestSetMeta(t *testing.T) { if file == nil { t.Fatalf("file not found") } - checkMapsEqual(t, map[string]any{"a": 5, "b": "hello"}, file.Meta, "meta") + checkMapsEqual(t, map[string]any{"a": 5, "b": "hello", "q": 8}, file.Meta, "meta") if GBS.getCacheSize() != 1 { t.Errorf("cache size mismatch") } + err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 6, "c": "world", "d": 7, "q": nil}, true) + if err != nil { + t.Fatalf("error setting meta: %v", err) + } + file, err = GBS.Stat(ctx, blockId, "testfile") + if err != nil { + t.Fatalf("error stating file: %v", err) + } + if file == nil { + t.Fatalf("file not found") + } + checkMapsEqual(t, map[string]any{"a": 6, "b": "hello", "c": "world", "d": 7}, file.Meta, "meta") } From 023e1babe2e1b0a3d2965e761476f6e3da7cb521 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 13 May 2024 00:33:46 -0700 Subject: [PATCH 06/10] test simple append --- pkg/blockstore/blockstore.go | 18 ++++++++-- pkg/blockstore/blockstore_cache.go | 30 +++++++++++++++++ pkg/blockstore/blockstore_test.go | 54 ++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 36cde7e47..4329624fc 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -395,10 +395,11 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of // limit read to the actual size of the file size = entry.FileEntry.File.Size - offset } - for partIdx, _ := range dataEntries { - if entry.DataEntries[partIdx] != nil { - dataEntries[partIdx] = entry.DataEntries[partIdx] + for _, partIdx := range partsNeeded { + if len(entry.DataEntries) <= partIdx || entry.DataEntries[partIdx] == nil { + continue } + dataEntries[partIdx] = entry.DataEntries[partIdx] } return nil }) @@ -428,6 +429,17 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of return offset, rtn, nil } +func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (int64, []byte, error) { + file, err := s.Stat(ctx, blockId, name) + if err != nil { + return 0, nil, fmt.Errorf("error getting file: %v", err) + } + if file == nil { + return 0, nil, fmt.Errorf("file not found") + } + return s.ReadAt(ctx, blockId, name, 0, file.Size) +} + func minInt64(a, b int64) int64 { if a < b { return a diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index 8b72fa5a8..b9b85324a 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -4,6 +4,7 @@ package blockstore import ( + "bytes" "fmt" "sync" "sync/atomic" @@ -43,6 +44,35 @@ type CacheEntry struct { DataEntries []*DataCacheEntry } +func (e *CacheEntry) dump() string { + var buf bytes.Buffer + fmt.Fprintf(&buf, "CacheEntry{\nBlockId: %q, Name: %q, Version: %d, PinCount: %d, Deleted: %v\n", e.BlockId, e.Name, e.Version, e.PinCount, e.Deleted) + if e.FileEntry != nil { + fmt.Fprintf(&buf, "FileEntry: %v\n", e.FileEntry.File) + } + for i, dce := range e.DataEntries { + if dce != nil { + fmt.Fprintf(&buf, "DataEntry[%d][%v]: %q\n", i, dce.Dirty.Load(), string(dce.Data)) + } + } + buf.WriteString("}\n") + return buf.String() +} + +func (s *BlockStore) dump() string { + s.Lock.Lock() + defer s.Lock.Unlock() + var buf bytes.Buffer + buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache))) + for _, v := range s.Cache { + entryStr := v.dump() + buf.WriteString(entryStr) + buf.WriteString("\n") + } + return buf.String() + +} + // for testing func (s *BlockStore) getCacheSize() int { s.Lock.Lock() diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index d963dd8cd..bfdd80f1f 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -132,3 +132,57 @@ func TestSetMeta(t *testing.T) { } checkMapsEqual(t, map[string]any{"a": 6, "b": "hello", "c": "world", "d": 7}, file.Meta, "meta") } + +func checkFileSize(t *testing.T, ctx context.Context, blockId string, name string, size int64) { + file, err := GBS.Stat(ctx, blockId, name) + if err != nil { + t.Errorf("error stating file %q: %v", name, err) + return + } + if file == nil { + t.Errorf("file %q not found", name) + return + } + if file.Size != size { + t.Errorf("size mismatch for file %q: expected %d, got %d", name, size, file.Size) + } +} + +func checkFileData(t *testing.T, ctx context.Context, blockId string, name string, data string) { + _, rdata, err := GBS.ReadFile(ctx, blockId, name) + if err != nil { + t.Errorf("error reading data for file %q: %v", name, err) + return + } + if string(rdata) != data { + t.Errorf("data mismatch for file %q: expected %q, got %q", name, data, string(rdata)) + } +} + +func TestAppend(t *testing.T) { + initDb(t) + defer cleanupDb(t) + + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + fileName := "t2" + err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + err = GBS.AppendData(ctx, blockId, fileName, []byte("hello")) + if err != nil { + t.Fatalf("error appending data: %v", err) + } + // fmt.Print(GBS.dump()) + checkFileSize(t, ctx, blockId, fileName, 5) + checkFileData(t, ctx, blockId, fileName, "hello") + err = GBS.AppendData(ctx, blockId, fileName, []byte(" world")) + if err != nil { + t.Fatalf("error appending data: %v", err) + } + // fmt.Print(GBS.dump()) + checkFileSize(t, ctx, blockId, fileName, 11) + checkFileData(t, ctx, blockId, fileName, "hello world") +} From 7b8c4866213de7337ec12d2acd3714acbe2ea3e9 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 13 May 2024 11:45:47 -0700 Subject: [PATCH 07/10] use new txwrap, simplify --- go.mod | 4 +- go.sum | 4 +- pkg/blockstore/blockstore_test.go | 7 +-- pkg/blockstore/dbsetup.go | 92 ++++++++----------------------- 4 files changed, 28 insertions(+), 79 deletions(-) diff --git a/go.mod b/go.mod index 30811ede0..82c6688e5 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,10 @@ toolchain go1.22.1 require ( github.com/golang-migrate/migrate/v4 v4.17.1 + github.com/google/uuid v1.4.0 github.com/jmoiron/sqlx v1.4.0 github.com/mattn/go-sqlite3 v1.14.22 - github.com/sawka/txwrap v0.1.2 + github.com/sawka/txwrap v0.2.0 github.com/wailsapp/wails/v3 v3.0.0-alpha.0 github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94 ) @@ -28,7 +29,6 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/uuid v1.4.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect diff --git a/go.sum b/go.sum index 2e5c12bf6..9c2e268d0 100644 --- a/go.sum +++ b/go.sum @@ -106,8 +106,8 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= -github.com/sawka/txwrap v0.1.2 h1:v8xS0Z1LE7/6vMZA81PYihI+0TSR6Zm1MalzzBIuXKc= -github.com/sawka/txwrap v0.1.2/go.mod h1:T3nlw2gVpuolo6/XEetvBbk1oMXnY978YmBFy1UyHvw= +github.com/sawka/txwrap v0.2.0 h1:V3LfvKVLULxcYSxdMguLwFyQFMEU9nFDJopg0ZkL+94= +github.com/sawka/txwrap v0.2.0/go.mod h1:wwQ2SQiN4U+6DU/iVPhbvr7OzXAtgZlQCIGuvOswEfA= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index bfdd80f1f..8ea04201b 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -15,21 +15,18 @@ func initDb(t *testing.T) { t.Logf("initializing db for %q", t.Name()) useTestingDb = true partDataSize = 64 - err := MigrateBlockstore(false) + err := InitBlockstore() if err != nil { - t.Fatalf("error migrating blockstore: %v", err) + t.Fatalf("error initializing blockstore: %v", err) } } func cleanupDb(t *testing.T) { t.Logf("cleaning up db for %q", t.Name()) - globalDBLock.Lock() - defer globalDBLock.Unlock() if globalDB != nil { globalDB.Close() globalDB = nil } - globalDBErr = nil useTestingDb = false partDataSize = DefaultPartDataSize GBS.clearCache() diff --git a/pkg/blockstore/dbsetup.go b/pkg/blockstore/dbsetup.go index 49f7f9019..406f8433b 100644 --- a/pkg/blockstore/dbsetup.go +++ b/pkg/blockstore/dbsetup.go @@ -8,7 +8,6 @@ import ( "fmt" "log" "path" - "sync" "time" "github.com/wavetermdev/thenextwave/pkg/wavebase" @@ -25,26 +24,20 @@ import ( const BlockstoreDbName = "blockstore.db" -type SingleConnDBGetter struct { - SingleConnLock *sync.Mutex -} - type TxWrap = txwrap.TxWrap -var dbWrap *SingleConnDBGetter = &SingleConnDBGetter{SingleConnLock: &sync.Mutex{}} -var globalDBLock = &sync.Mutex{} var globalDB *sqlx.DB -var globalDBErr error var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db) func InitBlockstore() error { - err := MigrateBlockstore(false) + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + var err error + globalDB, err = MakeDB(ctx) if err != nil { return err } - ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) - defer cancelFn() - _, err = GetDB(ctx) + err = MigrateBlockstore() if err != nil { return err } @@ -57,63 +50,31 @@ func GetDBName() string { return path.Join(scHome, BlockstoreDbName) } -func GetDB(ctx context.Context) (*sqlx.DB, error) { - if txwrap.IsTxWrapContext(ctx) { - return nil, fmt.Errorf("cannot call GetDB from within a running transaction") - } - globalDBLock.Lock() - defer globalDBLock.Unlock() - if globalDB == nil && globalDBErr == nil { - if useTestingDb { - dbName := ":memory:" - globalDB, globalDBErr = sqlx.Open("sqlite3", dbName) - if globalDBErr != nil { - log.Printf("[db] in-memory db err: %v\n", globalDBErr) - } else { - log.Printf("[db] using in-memory db\n") - } - return globalDB, globalDBErr - } +func MakeDB(ctx context.Context) (*sqlx.DB, error) { + var rtn *sqlx.DB + var err error + if useTestingDb { + dbName := ":memory:" + log.Printf("[db] using in-memory db\n") + rtn, err = sqlx.Open("sqlite3", dbName) + } else { dbName := GetDBName() - globalDB, globalDBErr = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName)) - if globalDBErr != nil { - globalDBErr = fmt.Errorf("opening db[%s]: %w", dbName, globalDBErr) - log.Printf("[db] error: %v\n", globalDBErr) - } else { - log.Printf("[db] successfully opened db %s\n", dbName) - } + log.Printf("[db] opening db %s\n", dbName) + rtn, err = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName)) } - return globalDB, globalDBErr -} - -func (dbg *SingleConnDBGetter) GetDB(ctx context.Context) (*sqlx.DB, error) { - db, err := GetDB(ctx) if err != nil { - return nil, err + return nil, fmt.Errorf("opening db: %w", err) } - dbg.SingleConnLock.Lock() - return db, nil -} - -func (dbg *SingleConnDBGetter) ReleaseDB(db *sqlx.DB) { - dbg.SingleConnLock.Unlock() + rtn.DB.SetMaxOpenConns(1) + return rtn, nil } func WithTx(ctx context.Context, fn func(tx *TxWrap) error) error { - return txwrap.DBGWithTx(ctx, dbWrap, fn) + return txwrap.WithTx(ctx, globalDB, fn) } func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (RT, error) { - var rtn RT - txErr := WithTx(ctx, func(tx *TxWrap) error { - temp, err := fn(tx) - if err != nil { - return err - } - rtn = temp - return nil - }) - return rtn, txErr + return txwrap.WithTxRtn(ctx, globalDB, fn) } func MakeBlockstoreMigrate() (*migrate.Migrate, error) { @@ -121,13 +82,7 @@ func MakeBlockstoreMigrate() (*migrate.Migrate, error) { if err != nil { return nil, fmt.Errorf("opening iofs: %w", err) } - ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) - defer cancelFn() - db, err := GetDB(ctx) - if err != nil { - return nil, err - } - mdriver, err := sqlite3migrate.WithInstance(db.DB, &sqlite3migrate.Config{}) + mdriver, err := sqlite3migrate.WithInstance(globalDB.DB, &sqlite3migrate.Config{}) if err != nil { return nil, fmt.Errorf("making blockstore migration driver: %w", err) } @@ -138,7 +93,7 @@ func MakeBlockstoreMigrate() (*migrate.Migrate, error) { return m, nil } -func MigrateBlockstore(shouldClose bool) error { +func MigrateBlockstore() error { log.Printf("migrate blockstore\n") m, err := MakeBlockstoreMigrate() if err != nil { @@ -151,9 +106,6 @@ func MigrateBlockstore(shouldClose bool) error { if err != nil { return fmt.Errorf("cannot get current migration version: %v", err) } - if shouldClose { - defer m.Close() - } err = m.Up() if err != nil && err != migrate.ErrNoChange { return fmt.Errorf("migrating blockstore: %w", err) From 75d2c48c5761caa19ad1cb70bb95526cd2c89826 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 13 May 2024 11:55:48 -0700 Subject: [PATCH 08/10] test a write that spans two blocks --- pkg/blockstore/blockstore_test.go | 41 ++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 8ea04201b..5f44f9f32 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -4,7 +4,9 @@ package blockstore import ( + "bytes" "context" + "fmt" "testing" "time" @@ -14,7 +16,7 @@ import ( func initDb(t *testing.T) { t.Logf("initializing db for %q", t.Name()) useTestingDb = true - partDataSize = 64 + partDataSize = 50 err := InitBlockstore() if err != nil { t.Fatalf("error initializing blockstore: %v", err) @@ -183,3 +185,40 @@ func TestAppend(t *testing.T) { checkFileSize(t, ctx, blockId, fileName, 11) checkFileData(t, ctx, blockId, fileName, "hello world") } + +func makeText(n int) string { + var buf bytes.Buffer + for i := 0; i < n; i++ { + buf.WriteByte(byte('0' + (i % 10))) + } + return buf.String() +} + +func TestMultiPart(t *testing.T) { + initDb(t) + defer cleanupDb(t) + + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + fileName := "m2" + data := makeText(80) + err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + err = GBS.AppendData(ctx, blockId, fileName, []byte(data)) + if err != nil { + t.Fatalf("error appending data: %v", err) + } + checkFileSize(t, ctx, blockId, fileName, 80) + checkFileData(t, ctx, blockId, fileName, data) + _, barr, err := GBS.ReadAt(ctx, blockId, fileName, 42, 10) + if err != nil { + t.Fatalf("error reading data: %v", err) + } + if string(barr) != data[42:52] { + t.Errorf("data mismatch: expected %q, got %q", data[42:52], string(barr)) + } + fmt.Print(GBS.dump()) +} From b5f7ff699c9aeb05ae299274e22c50a7c1dc84f3 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 13 May 2024 12:00:18 -0700 Subject: [PATCH 09/10] fix size bug, add more multi/writeat testing --- pkg/blockstore/blockstore.go | 2 +- pkg/blockstore/blockstore_test.go | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 4329624fc..6c8714710 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -303,7 +303,7 @@ func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name str } func (s *BlockStore) writeAt_nolock(entry *CacheEntry, offset int64, data []byte) { - endWrite := entry.FileEntry.File.Size + int64(len(data)) + endWrite := offset + int64(len(data)) entry.writeAt(offset, data) if endWrite > entry.FileEntry.File.Size { entry.FileEntry.File.Size = endWrite diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 5f44f9f32..c00c0b79d 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -6,7 +6,6 @@ package blockstore import ( "bytes" "context" - "fmt" "testing" "time" @@ -158,6 +157,17 @@ func checkFileData(t *testing.T, ctx context.Context, blockId string, name strin } } +func checkFileDataAt(t *testing.T, ctx context.Context, blockId string, name string, offset int64, data string) { + _, rdata, err := GBS.ReadAt(ctx, blockId, name, offset, int64(len(data))) + if err != nil { + t.Errorf("error reading data for file %q: %v", name, err) + return + } + if string(rdata) != data { + t.Errorf("data mismatch for file %q: expected %q, got %q", name, data, string(rdata)) + } +} + func TestAppend(t *testing.T) { initDb(t) defer cleanupDb(t) @@ -220,5 +230,8 @@ func TestMultiPart(t *testing.T) { if string(barr) != data[42:52] { t.Errorf("data mismatch: expected %q, got %q", data[42:52], string(barr)) } - fmt.Print(GBS.dump()) + GBS.WriteAt(ctx, blockId, fileName, 49, []byte("world")) + checkFileSize(t, ctx, blockId, fileName, 80) + checkFileDataAt(t, ctx, blockId, fileName, 49, "world") + checkFileDataAt(t, ctx, blockId, fileName, 48, "8world4") } From a10f0d99eacdfe27ec65d8522efdc1b946b334be Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 13 May 2024 13:40:25 -0700 Subject: [PATCH 10/10] progress, working on flush --- pkg/blockstore/blockstore.go | 7 +-- pkg/blockstore/blockstore_cache.go | 84 ++++++++++++++++++++++++------ pkg/blockstore/blockstore_dbops.go | 11 +--- pkg/blockstore/blockstore_test.go | 1 + 4 files changed, 75 insertions(+), 28 deletions(-) diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 6c8714710..849aff884 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -11,6 +11,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" ) @@ -19,11 +20,11 @@ const DefaultFlushTime = 5 * time.Second const NoPartIdx = -1 var partDataSize int64 = DefaultPartDataSize // overridden in tests +var stopFlush = &atomic.Bool{} var GBS *BlockStore = &BlockStore{ - Lock: &sync.Mutex{}, - Cache: make(map[cacheKey]*CacheEntry), - FlushTime: DefaultFlushTime, + Lock: &sync.Mutex{}, + Cache: make(map[cacheKey]*CacheEntry), } type FileOptsType struct { diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index b9b85324a..757a92a8d 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -5,10 +5,10 @@ package blockstore import ( "bytes" + "context" "fmt" "sync" "sync/atomic" - "time" ) type cacheKey struct { @@ -17,9 +17,10 @@ type cacheKey struct { } type DataCacheEntry struct { - Dirty *atomic.Bool - PartIdx int - Data []byte // capacity is always BlockDataPartSize + Dirty *atomic.Bool + Flushing *atomic.Bool + PartIdx int + Data []byte // capacity is always BlockDataPartSize } type FileCacheEntry struct { @@ -70,7 +71,15 @@ func (s *BlockStore) dump() string { buf.WriteString("\n") } return buf.String() +} +func makeDataCacheEntry(partIdx int) *DataCacheEntry { + return &DataCacheEntry{ + Dirty: &atomic.Bool{}, + Flushing: &atomic.Bool{}, + PartIdx: partIdx, + Data: make([]byte, 0, partDataSize), + } } // for testing @@ -92,16 +101,24 @@ func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry { e.DataEntries = append(e.DataEntries, nil) } if create && e.DataEntries[partIdx] == nil { - e.DataEntries[partIdx] = &DataCacheEntry{ - PartIdx: partIdx, - Data: make([]byte, 0, partDataSize), - Dirty: &atomic.Bool{}, - } + e.DataEntries[partIdx] = makeDataCacheEntry(partIdx) } return e.DataEntries[partIdx] } -func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 { +func (dce *DataCacheEntry) clonePart() *DataCacheEntry { + rtn := makeDataCacheEntry(dce.PartIdx) + copy(rtn.Data, dce.Data) + if dce.Dirty.Load() { + rtn.Dirty.Store(true) + } + return rtn +} + +func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) { + if dce.Flushing.Load() { + dce = dce.clonePart() + } leftInPart := partDataSize - offset toWrite := int64(len(data)) if toWrite > leftInPart { @@ -112,7 +129,7 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 { } copy(dce.Data[offset:], data[:toWrite]) dce.Dirty.Store(true) - return toWrite + return toWrite, dce } func (entry *CacheEntry) writeAt(offset int64, data []byte) { @@ -124,16 +141,16 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte) { } partOffset := offset % partDataSize partData := entry.ensurePart(partIdx, true) - nw := partData.writeToPart(partOffset, data) + nw, newDce := partData.writeToPart(partOffset, data) + entry.DataEntries[partIdx] = newDce data = data[nw:] offset += nw } } type BlockStore struct { - Lock *sync.Mutex - Cache map[cacheKey]*CacheEntry - FlushTime time.Duration + Lock *sync.Mutex + Cache map[cacheKey]*CacheEntry } func (s *BlockStore) withLock(blockId string, name string, shouldCreate bool, f func(*CacheEntry)) { @@ -236,3 +253,40 @@ func (e *CacheEntry) copyOrCreateFileEntry(dbFile *BlockFile) *FileCacheEntry { File: *e.FileEntry.File.DeepCopy(), } } + +// also sets Flushing to true +func (s *BlockStore) getDirtyDataEntries(entry *CacheEntry) (*FileCacheEntry, []*DataCacheEntry) { + s.Lock.Lock() + defer s.Lock.Unlock() + if entry.Deleted || entry.FileEntry == nil { + return nil, nil + } + var dirtyData []*DataCacheEntry + for _, dce := range entry.DataEntries { + if dce != nil && dce.Dirty.Load() { + dirtyData = append(dirtyData, dce) + } + } + if !entry.FileEntry.Dirty.Load() && len(dirtyData) == 0 { + return nil, nil + } + for _, data := range dirtyData { + data.Flushing.Store(true) + } + return entry.FileEntry, dirtyData +} + +// clean is true if the block was clean (nothing to write) +// returns (clean, error) +func (s *BlockStore) flushEntry(ctx context.Context, entry *CacheEntry) error { + fileEntry, dirtyData := s.getDirtyDataEntries(entry) + if fileEntry == nil && len(dirtyData) == 0 { + s.tryDeleteCacheEntry(entry.BlockId, entry.Name) + return nil + } + err := dbWriteCacheEntry(ctx, fileEntry, dirtyData) + if err != nil { + return err + } + return nil +} diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go index 03d338e7d..1b0c31061 100644 --- a/pkg/blockstore/blockstore_dbops.go +++ b/pkg/blockstore/blockstore_dbops.go @@ -36,16 +36,6 @@ func dbGetBlockFileNames(ctx context.Context, blockId string) ([]string, error) }) } -func dbDeleteBlock(ctx context.Context, blockId string) error { - return WithTx(ctx, func(tx *TxWrap) error { - query := "DELETE FROM db_block_file WHERE blockid = ?" - tx.Exec(query, blockId) - query = "DELETE FROM db_block_data WHERE blockid = ?" - tx.Exec(query, blockId) - return nil - }) -} - func dbGetBlockFile(ctx context.Context, blockId string, name string) (*BlockFile, error) { return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) { query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?" @@ -112,6 +102,7 @@ func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntri for _, dataEntry := range dataEntries { if dataEntry != nil { dataEntry.Dirty.Store(false) + dataEntry.Flushing.Store(false) } } } diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index c00c0b79d..090bc46b6 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -16,6 +16,7 @@ func initDb(t *testing.T) { t.Logf("initializing db for %q", t.Name()) useTestingDb = true partDataSize = 50 + stopFlush.Store(true) err := InitBlockstore() if err != nil { t.Fatalf("error initializing blockstore: %v", err)