Merge pull request #1 from wavetermdev/sawka/blockstore

blockstore progress
This commit is contained in:
Mike Sawka 2024-05-13 13:41:18 -07:00 committed by GitHub
commit cf803cf12c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1404 additions and 8 deletions

9
db/db.go Normal file
View File

@ -0,0 +1,9 @@
// Copyright 2023, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package db
import "embed"
//go:embed migrations-blockstore/*.sql
var BlockstoreMigrationFS embed.FS

View File

@ -0,0 +1,3 @@
DROP TABLE block_file;
DROP TABLE block_data;

View File

@ -0,0 +1,19 @@
CREATE TABLE db_block_file (
blockid varchar(36) NOT NULL,
name varchar(200) NOT NULL,
size bigint NOT NULL,
createdts bigint NOT NULL,
modts bigint NOT NULL,
opts json NOT NULL,
meta json NOT NULL,
PRIMARY KEY (blockid, name)
);
CREATE TABLE db_block_data (
blockid varchar(36) NOT NULL,
name varchar(200) NOT NULL,
partidx int NOT NULL,
data blob NOT NULL,
PRIMARY KEY(blockid, name, partidx)
);

View File

@ -6,6 +6,8 @@ import { Greet } from "./bindings/main/GreetService.js";
import { Events } from "@wailsio/runtime";
import * as rx from "rxjs";
import "/public/style.less";
const jotaiStore = createStore();
const counterSubject = rx.interval(1000).pipe(rx.map((i) => i));
const timeAtom = jotai.atom("No time yet");

View File

@ -5,8 +5,6 @@ import * as React from "react";
import { createRoot } from "react-dom/client";
import { App } from "./app.tsx";
import "./public/style.less";
document.addEventListener("DOMContentLoaded", () => {
let reactElem = React.createElement(App, null, null);
let elem = document.getElementById("main");

16
go.mod
View File

@ -1,10 +1,18 @@
module thenextwave
module github.com/wavetermdev/thenextwave
go 1.22
toolchain go1.22.1
require github.com/wailsapp/wails/v3 v3.0.0-alpha.0
require (
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/google/uuid v1.4.0
github.com/jmoiron/sqlx v1.4.0
github.com/mattn/go-sqlite3 v1.14.22
github.com/sawka/txwrap v0.2.0
github.com/wailsapp/wails/v3 v3.0.0-alpha.0
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94
)
require (
dario.cat/mergo v1.0.0 // indirect
@ -21,7 +29,8 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
@ -40,6 +49,7 @@ require (
github.com/wailsapp/go-webview2 v1.0.9 // indirect
github.com/wailsapp/mimetype v1.4.1 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/mod v0.12.0 // indirect

28
go.sum
View File

@ -1,5 +1,7 @@
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
@ -38,18 +40,29 @@ github.com/go-git/go-git/v5 v5.11.0 h1:XIZc1p+8YzypNr34itUfSvYJcv+eYdTnTvOZ2vD3c
github.com/go-git/go-git/v5 v5.11.0/go.mod h1:6GFcX2P3NM7FPBfpePbpLd21XxsgdAt+lKqXmCUiUCY=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4=
github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e h1:Q3+PugElBCf4PFpxhErSzU3/PY5sFL5Z6rfv4AbGAck=
github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e/go.mod h1:alcuEEnZsY1WQsagKhZDsoPCRoOijYqhZvPwLG0kzVs=
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@ -63,6 +76,8 @@ github.com/leaanthony/go-ansi-parser v1.6.1 h1:xd8bzARK3dErqkPFtoF9F3/HgN8UQk0ed
github.com/leaanthony/go-ansi-parser v1.6.1/go.mod h1:+vva/2y4alzVmmIEpk9QDhA7vLC5zKDTRwfZGOp3IWU=
github.com/leaanthony/u v1.1.0 h1:2n0d2BwPVXSUq5yhe8lJPHdxevE2qK5G99PMStMZMaI=
github.com/leaanthony/u v1.1.0/go.mod h1:9+o6hejoRljvZ3BzdYlVL0JYCwtnAsVuN9pVTQcaRfI=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc=
github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE=
@ -72,6 +87,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
@ -89,6 +106,8 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sawka/txwrap v0.2.0 h1:V3LfvKVLULxcYSxdMguLwFyQFMEU9nFDJopg0ZkL+94=
github.com/sawka/txwrap v0.2.0/go.mod h1:wwQ2SQiN4U+6DU/iVPhbvr7OzXAtgZlQCIGuvOswEfA=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
@ -96,6 +115,7 @@ github.com/skeema/knownhosts v1.2.1 h1:SHWdIUa82uGZz+F+47k8SY4QhhI291cXCpopT1lK2
github.com/skeema/knownhosts v1.2.1/go.mod h1:xYbVRSPxqBZFrdmDyMmsOs+uX1UZC3nTN3ThzgDxUwo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
@ -103,9 +123,13 @@ github.com/wailsapp/go-webview2 v1.0.9 h1:lrU+q0cf1wgLdR69rN+ZnRtMJNaJRrcQ4ELxoO
github.com/wailsapp/go-webview2 v1.0.9/go.mod h1:Uk2BePfCRzttBBjFrBmqKGJd41P6QIHeV9kTgIeOZNo=
github.com/wailsapp/mimetype v1.4.1 h1:pQN9ycO7uo4vsUUuPeHEYoUkLVkaRntMnHJxVwYhwHs=
github.com/wailsapp/mimetype v1.4.1/go.mod h1:9aV5k31bBOv5z6u+QP8TltzvNGJPmNJD4XlAL3U+j3o=
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94 h1:/SPCxd4KHlS4eRTreYEXWFRr8WfRFBcChlV5cgkaO58=
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94/go.mod h1:ywoo7DXdYueQ0tTPhVoB+wzRTgERSE19EA3mR6KGRaI=
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=

17
main.go
View File

@ -10,6 +10,9 @@ import (
"log"
"time"
"github.com/wavetermdev/thenextwave/pkg/blockstore"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wailsapp/wails/v3/pkg/application"
)
@ -26,6 +29,18 @@ func (g *GreetService) Greet(name string) string {
}
func main() {
err := wavebase.EnsureWaveHomeDir()
if err != nil {
log.Printf("error ensuring wave home dir: %v\n", err)
return
}
log.Printf("wave home dir: %s\n", wavebase.GetWaveHomeDir())
err = blockstore.InitBlockstore()
if err != nil {
log.Printf("error initializing blockstore: %v\n", err)
return
}
app := application.New(application.Options{
Name: "NextWave",
Description: "The Next Wave Terminal",
@ -65,7 +80,7 @@ func main() {
}()
// blocking
err := app.Run()
err = app.Run()
// If an error occurred while running the application, log it and exit.
if err != nil {

View File

@ -0,0 +1,449 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
// the blockstore package implements a write cache for block files
// it is not a read cache (reads still go to the DB -- unless items are in the cache)
// but all writes only go to the cache, and then the cache is periodically flushed to the DB
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
const DefaultPartDataSize = 64 * 1024
const DefaultFlushTime = 5 * time.Second
const NoPartIdx = -1
var partDataSize int64 = DefaultPartDataSize // overridden in tests
var stopFlush = &atomic.Bool{}
var GBS *BlockStore = &BlockStore{
Lock: &sync.Mutex{},
Cache: make(map[cacheKey]*CacheEntry),
}
type FileOptsType struct {
MaxSize int64
Circular bool
IJson bool
}
type FileMeta = map[string]any
type BlockFile struct {
BlockId string `json:"blockid"`
Name string `json:"name"`
Size int64 `json:"size"`
CreatedTs int64 `json:"createdts"`
ModTs int64 `json:"modts"`
Opts FileOptsType `json:"opts"`
Meta FileMeta `json:"meta"`
}
func copyMeta(meta FileMeta) FileMeta {
newMeta := make(FileMeta)
for k, v := range meta {
newMeta[k] = v
}
return newMeta
}
func (f *BlockFile) DeepCopy() *BlockFile {
if f == nil {
return nil
}
newFile := *f
newFile.Meta = copyMeta(f.Meta)
return &newFile
}
func (BlockFile) UseDBMap() {}
type BlockData struct {
BlockId string `json:"blockid"`
Name string `json:"name"`
PartIdx int `json:"partidx"`
Data []byte `json:"data"`
}
func (BlockData) UseDBMap() {}
// synchronous (does not interact with the cache)
func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error {
if opts.MaxSize < 0 {
return fmt.Errorf("max size must be non-negative")
}
if opts.Circular && opts.MaxSize <= 0 {
return fmt.Errorf("circular file must have a max size")
}
if opts.Circular && opts.IJson {
return fmt.Errorf("circular file cannot be ijson")
}
if opts.Circular {
if opts.MaxSize%partDataSize != 0 {
opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize
}
}
now := time.Now().UnixMilli()
file := &BlockFile{
BlockId: blockId,
Name: name,
Size: 0,
CreatedTs: now,
ModTs: now,
Opts: opts,
Meta: meta,
}
return dbInsertFile(ctx, file)
}
func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error {
err := dbDeleteFile(ctx, blockId, name)
if err != nil {
return fmt.Errorf("error deleting file: %v", err)
}
s.withLock(blockId, name, false, func(entry *CacheEntry) {
if entry == nil {
return
}
entry.Deleted = true
})
return nil
}
func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error {
fileNames, err := dbGetBlockFileNames(ctx, blockId)
if err != nil {
return fmt.Errorf("error getting block files: %v", err)
}
for _, name := range fileNames {
s.DeleteFile(ctx, blockId, name)
}
return nil
}
func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) {
file, ok := s.getFileFromCache(blockId, name)
if ok {
return file, nil
}
return dbGetBlockFile(ctx, blockId, name)
}
func stripNils[T any](arr []*T) []*T {
newArr := make([]*T, 0)
for _, item := range arr {
if item == nil {
continue
}
newArr = append(newArr, item)
}
return newArr
}
func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
files, err := dbGetBlockFiles(ctx, blockId)
if err != nil {
return nil, fmt.Errorf("error getting block files: %v", err)
}
// now we wash the files through the cache
var hasNils bool
for idx, dbFile := range files {
cacheFile, ok := s.getFileFromCache(dbFile.BlockId, dbFile.Name)
if ok {
if cacheFile == nil {
hasNils = true
}
files[idx] = cacheFile
}
}
if hasNils {
files = stripNils(files)
}
return files, nil
}
func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error {
file, ok := s.getFileFromCache(blockId, name)
if !ok {
dbFile, err := dbGetBlockFile(ctx, blockId, name)
if err != nil {
return fmt.Errorf("error getting file: %v", err)
}
file = dbFile
}
if file == nil {
return fmt.Errorf("file not found")
}
var rtnErr error
s.withLock(blockId, name, true, func(entry *CacheEntry) {
if entry.Deleted {
rtnErr = fmt.Errorf("file is deleted")
return
}
newFileEntry := entry.copyOrCreateFileEntry(file)
if merge {
for k, v := range meta {
if v == nil {
delete(newFileEntry.File.Meta, k)
continue
}
newFileEntry.File.Meta[k] = v
}
} else {
newFileEntry.File.Meta = meta
}
entry.FileEntry = newFileEntry
entry.FileEntry.File.ModTs = time.Now().UnixMilli()
entry.Version++
})
return rtnErr
}
func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name string) (*BlockFile, error) {
file, ok := s.getFileFromCache(blockId, name)
if ok {
if file == nil {
return nil, fmt.Errorf("file not found")
}
return file, nil
}
dbFile, err := dbGetBlockFile(ctx, blockId, name)
if err != nil {
return nil, fmt.Errorf("error getting file: %v", err)
}
if dbFile == nil {
return nil, fmt.Errorf("file not found")
}
var rtnErr error
rtnFile := dbFile
s.withLock(blockId, name, true, func(entry *CacheEntry) {
if entry.Deleted {
rtnFile = nil
rtnErr = fmt.Errorf("file is deleted")
return
}
if entry.FileEntry != nil {
// someone beat us to it
rtnFile = entry.FileEntry.File.DeepCopy()
return
}
entry.FileEntry = entry.copyOrCreateFileEntry(dbFile)
// returns dbFile, nil
})
return rtnFile, rtnErr
}
func (f *BlockFile) getLastIncompletePartNum() int {
if f.Size%partDataSize == 0 {
return NoPartIdx
}
return f.partIdxAtOffset(f.Size)
}
func (f *BlockFile) partIdxAtOffset(offset int64) int {
partIdx := int(offset / partDataSize)
if f.Opts.Circular {
maxPart := int(f.Opts.MaxSize / partDataSize)
partIdx = partIdx % maxPart
}
return partIdx
}
// blockfile must be loaded
func (s *BlockStore) loadLastDataBlock(ctx context.Context, blockId string, name string) error {
var partIdx int
err := s.withLockExists(blockId, name, func(entry *CacheEntry) error {
partIdx = entry.FileEntry.File.getLastIncompletePartNum()
return nil
})
if err != nil {
return err
}
if partIdx == NoPartIdx {
return nil
}
return s.loadDataParts(ctx, blockId, name, []int{partIdx})
}
func maxOfIntArr(arr []int) int {
if len(arr) == 0 {
return 0
}
max := arr[0]
for _, v := range arr[1:] {
if v > max {
max = v
}
}
return max
}
func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name string, parts []int) error {
partDataMap, err := dbGetFileParts(ctx, blockId, name, parts)
if err != nil {
return fmt.Errorf("error getting file part: %v", err)
}
maxPart := maxOfIntArr(parts)
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
entry.ensurePart(maxPart, false)
for partIdx, partData := range partDataMap {
if entry.DataEntries[partIdx] != nil {
// someone beat us to it
continue
}
entry.DataEntries[partIdx] = partData
}
return nil
})
}
func (s *BlockStore) writeAt_nolock(entry *CacheEntry, offset int64, data []byte) {
endWrite := offset + int64(len(data))
entry.writeAt(offset, data)
if endWrite > entry.FileEntry.File.Size {
entry.FileEntry.File.Size = endWrite
}
entry.FileEntry.File.ModTs = time.Now().UnixMilli()
entry.Version++
}
func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) error {
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
s.writeAt_nolock(entry, entry.FileEntry.File.Size, data)
return nil
})
}
func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error {
s.pinCacheEntry(blockId, name)
defer s.unpinCacheEntry(blockId, name)
_, err := s.loadFileInfo(ctx, blockId, name)
if err != nil {
return fmt.Errorf("error loading file info: %v", err)
}
err = s.loadLastDataBlock(ctx, blockId, name)
if err != nil {
return fmt.Errorf("error loading last data block: %v", err)
}
err = s.appendDataToCache(blockId, name, data)
if err != nil {
return fmt.Errorf("error appending data: %v", err)
}
return nil
}
func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) {
return dbGetAllBlockIds(ctx)
}
func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error {
s.pinCacheEntry(blockId, name)
defer s.unpinCacheEntry(blockId, name)
file, err := s.loadFileInfo(ctx, blockId, name)
if err != nil {
return fmt.Errorf("error loading file info: %v", err)
}
startWriteIdx := offset
endWriteIdx := offset + int64(len(data))
startPartIdx := file.partIdxAtOffset(startWriteIdx)
endPartIdx := file.partIdxAtOffset(endWriteIdx)
err = s.loadDataParts(ctx, blockId, name, []int{startPartIdx, endPartIdx})
if err != nil {
return fmt.Errorf("error loading data parts: %v", err)
}
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
s.writeAt_nolock(entry, offset, data)
return nil
})
}
// returns (offset, data, error)
// we return the offset because the offset may have been adjusted if the size was too big (for circular files)
func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (int64, []byte, error) {
s.pinCacheEntry(blockId, name)
defer s.unpinCacheEntry(blockId, name)
file, err := s.Stat(ctx, blockId, name)
if err != nil {
return 0, nil, fmt.Errorf("error getting file: %v", err)
}
if file.Opts.Circular {
// we can do this check here because MaxSize for file cannot be modified
if size > file.Opts.MaxSize {
// just read the last maxsize bytes
sizeTooBig := size - file.Opts.MaxSize
offset += sizeTooBig
}
}
var partsNeeded []int
lastPartOffset := (offset + size) % partDataSize
endOffsetOfLastPart := offset + size - lastPartOffset + partDataSize
for i := offset; i < endOffsetOfLastPart; i += partDataSize {
partsNeeded = append(partsNeeded, file.partIdxAtOffset(i))
}
dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded)
if err != nil {
return 0, nil, fmt.Errorf("error loading data parts: %v", err)
}
// wash the entries through the cache
err = s.withLockExists(blockId, name, func(entry *CacheEntry) error {
if offset+size > entry.FileEntry.File.Size {
// limit read to the actual size of the file
size = entry.FileEntry.File.Size - offset
}
for _, partIdx := range partsNeeded {
if len(entry.DataEntries) <= partIdx || entry.DataEntries[partIdx] == nil {
continue
}
dataEntries[partIdx] = entry.DataEntries[partIdx]
}
return nil
})
if err != nil {
return 0, nil, fmt.Errorf("error reconciling cache entries: %v", err)
}
// combine the entries into a single byte slice
// note that we only want part of the first and last part depending on offset and size
var rtn []byte
amtLeftToRead := size
curReadOffset := offset
for amtLeftToRead > 0 {
partIdx := file.partIdxAtOffset(curReadOffset)
partDataEntry := dataEntries[partIdx]
var partData []byte
if partDataEntry == nil {
partData = make([]byte, partDataSize)
} else {
partData = partDataEntry.Data[0:partDataSize]
}
partOffset := curReadOffset % partDataSize
amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead)
rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...)
amtLeftToRead -= amtToRead
curReadOffset += amtToRead
}
return offset, rtn, nil
}
func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (int64, []byte, error) {
file, err := s.Stat(ctx, blockId, name)
if err != nil {
return 0, nil, fmt.Errorf("error getting file: %v", err)
}
if file == nil {
return 0, nil, fmt.Errorf("file not found")
}
return s.ReadAt(ctx, blockId, name, 0, file.Size)
}
func minInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}

View File

@ -0,0 +1,292 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
import (
"bytes"
"context"
"fmt"
"sync"
"sync/atomic"
)
type cacheKey struct {
BlockId string
Name string
}
type DataCacheEntry struct {
Dirty *atomic.Bool
Flushing *atomic.Bool
PartIdx int
Data []byte // capacity is always BlockDataPartSize
}
type FileCacheEntry struct {
Dirty *atomic.Bool
File BlockFile
}
// invariants:
// - we only modify CacheEntry fields when we are holding the BlockStore lock
// - FileEntry can be nil, if pinned
// - FileEntry.File is never updated in place, the entire FileEntry is replaced
// - DataCacheEntry items are never updated in place, the entire DataCacheEntry is replaced
// - when pinned, the cache entry is never removed
// this allows us to flush the cache entry to disk without holding the lock
type CacheEntry struct {
BlockId string
Name string
Version int
PinCount int
Deleted bool
FileEntry *FileCacheEntry
DataEntries []*DataCacheEntry
}
func (e *CacheEntry) dump() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "CacheEntry{\nBlockId: %q, Name: %q, Version: %d, PinCount: %d, Deleted: %v\n", e.BlockId, e.Name, e.Version, e.PinCount, e.Deleted)
if e.FileEntry != nil {
fmt.Fprintf(&buf, "FileEntry: %v\n", e.FileEntry.File)
}
for i, dce := range e.DataEntries {
if dce != nil {
fmt.Fprintf(&buf, "DataEntry[%d][%v]: %q\n", i, dce.Dirty.Load(), string(dce.Data))
}
}
buf.WriteString("}\n")
return buf.String()
}
func (s *BlockStore) dump() string {
s.Lock.Lock()
defer s.Lock.Unlock()
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache)))
for _, v := range s.Cache {
entryStr := v.dump()
buf.WriteString(entryStr)
buf.WriteString("\n")
}
return buf.String()
}
func makeDataCacheEntry(partIdx int) *DataCacheEntry {
return &DataCacheEntry{
Dirty: &atomic.Bool{},
Flushing: &atomic.Bool{},
PartIdx: partIdx,
Data: make([]byte, 0, partDataSize),
}
}
// for testing
func (s *BlockStore) getCacheSize() int {
s.Lock.Lock()
defer s.Lock.Unlock()
return len(s.Cache)
}
// for testing
func (s *BlockStore) clearCache() {
s.Lock.Lock()
defer s.Lock.Unlock()
s.Cache = make(map[cacheKey]*CacheEntry)
}
func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry {
for len(e.DataEntries) <= partIdx {
e.DataEntries = append(e.DataEntries, nil)
}
if create && e.DataEntries[partIdx] == nil {
e.DataEntries[partIdx] = makeDataCacheEntry(partIdx)
}
return e.DataEntries[partIdx]
}
func (dce *DataCacheEntry) clonePart() *DataCacheEntry {
rtn := makeDataCacheEntry(dce.PartIdx)
copy(rtn.Data, dce.Data)
if dce.Dirty.Load() {
rtn.Dirty.Store(true)
}
return rtn
}
func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) {
if dce.Flushing.Load() {
dce = dce.clonePart()
}
leftInPart := partDataSize - offset
toWrite := int64(len(data))
if toWrite > leftInPart {
toWrite = leftInPart
}
if int64(len(dce.Data)) < offset+toWrite {
dce.Data = dce.Data[:offset+toWrite]
}
copy(dce.Data[offset:], data[:toWrite])
dce.Dirty.Store(true)
return toWrite, dce
}
func (entry *CacheEntry) writeAt(offset int64, data []byte) {
for len(data) > 0 {
partIdx := int(offset / partDataSize)
if entry.FileEntry.File.Opts.Circular {
maxPart := int(entry.FileEntry.File.Opts.MaxSize / partDataSize)
partIdx = partIdx % maxPart
}
partOffset := offset % partDataSize
partData := entry.ensurePart(partIdx, true)
nw, newDce := partData.writeToPart(partOffset, data)
entry.DataEntries[partIdx] = newDce
data = data[nw:]
offset += nw
}
}
type BlockStore struct {
Lock *sync.Mutex
Cache map[cacheKey]*CacheEntry
}
func (s *BlockStore) withLock(blockId string, name string, shouldCreate bool, f func(*CacheEntry)) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
if shouldCreate {
entry = &CacheEntry{
BlockId: blockId,
Name: name,
PinCount: 0,
FileEntry: nil,
DataEntries: nil,
}
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
}
}
f(entry)
}
func (s *BlockStore) withLockExists(blockId string, name string, f func(*CacheEntry) error) error {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil || entry.Deleted || entry.FileEntry == nil {
return fmt.Errorf("file not found")
}
return f(entry)
}
func (s *BlockStore) pinCacheEntry(blockId string, name string) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
entry = &CacheEntry{
BlockId: blockId,
Name: name,
PinCount: 0,
FileEntry: nil,
DataEntries: nil,
}
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
}
entry.PinCount++
}
func (s *BlockStore) unpinCacheEntry(blockId string, name string) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
// this is not good
return
}
entry.PinCount--
}
func (s *BlockStore) tryDeleteCacheEntry(blockId string, name string) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
return
}
if entry.PinCount > 0 {
return
}
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
}
// getFileFromCache returns the file from the cache if it exists
// return (file, cached)
func (s *BlockStore) getFileFromCache(blockId string, name string) (*BlockFile, bool) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
return nil, false
}
if entry.Deleted {
return nil, true
}
if entry.FileEntry == nil {
return nil, false
}
return entry.FileEntry.File.DeepCopy(), true
}
func (e *CacheEntry) copyOrCreateFileEntry(dbFile *BlockFile) *FileCacheEntry {
if e.FileEntry == nil {
return &FileCacheEntry{
Dirty: &atomic.Bool{},
File: *dbFile,
}
}
return &FileCacheEntry{
Dirty: &atomic.Bool{},
File: *e.FileEntry.File.DeepCopy(),
}
}
// also sets Flushing to true
func (s *BlockStore) getDirtyDataEntries(entry *CacheEntry) (*FileCacheEntry, []*DataCacheEntry) {
s.Lock.Lock()
defer s.Lock.Unlock()
if entry.Deleted || entry.FileEntry == nil {
return nil, nil
}
var dirtyData []*DataCacheEntry
for _, dce := range entry.DataEntries {
if dce != nil && dce.Dirty.Load() {
dirtyData = append(dirtyData, dce)
}
}
if !entry.FileEntry.Dirty.Load() && len(dirtyData) == 0 {
return nil, nil
}
for _, data := range dirtyData {
data.Flushing.Store(true)
}
return entry.FileEntry, dirtyData
}
// clean is true if the block was clean (nothing to write)
// returns (clean, error)
func (s *BlockStore) flushEntry(ctx context.Context, entry *CacheEntry) error {
fileEntry, dirtyData := s.getDirtyDataEntries(entry)
if fileEntry == nil && len(dirtyData) == 0 {
s.tryDeleteCacheEntry(entry.BlockId, entry.Name)
return nil
}
err := dbWriteCacheEntry(ctx, fileEntry, dirtyData)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,111 @@
package blockstore
import (
"context"
"fmt"
"sync/atomic"
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
)
func dbInsertFile(ctx context.Context, file *BlockFile) error {
// will fail if file already exists
return WithTx(ctx, func(tx *TxWrap) error {
query := "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)"
tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta))
return nil
})
}
func dbDeleteFile(ctx context.Context, blockId string, name string) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := "DELETE FROM db_block_file WHERE blockid = ? AND name = ?"
tx.Exec(query, blockId, name)
query = "DELETE FROM db_block_data WHERE blockid = ? AND name = ?"
tx.Exec(query, blockId, name)
return nil
})
}
func dbGetBlockFileNames(ctx context.Context, blockId string) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var files []string
query := "SELECT name FROM db_block_file WHERE blockid = ?"
tx.Select(&files, query, blockId)
return files, nil
})
}
func dbGetBlockFile(ctx context.Context, blockId string, name string) (*BlockFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) {
query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?"
file := dbutil.GetMappable[*BlockFile](tx, query, blockId, name)
return file, nil
})
}
func dbGetAllBlockIds(ctx context.Context) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var ids []string
query := "SELECT DISTINCT blockid FROM db_block_file"
tx.Select(&ids, query)
return ids, nil
})
}
func dbGetFileParts(ctx context.Context, blockId string, name string, parts []int) (map[int]*DataCacheEntry, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) {
var data []*DataCacheEntry
query := "SELECT partidx, data FROM db_block_data WHERE blockid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))"
tx.Select(&data, query, blockId, name, dbutil.QuickJsonArr(parts))
rtn := make(map[int]*DataCacheEntry)
for _, d := range data {
d.Dirty = &atomic.Bool{}
rtn[d.PartIdx] = d
}
return rtn, nil
})
}
func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) {
query := "SELECT * FROM db_block_file WHERE blockid = ?"
files := dbutil.SelectMappable[*BlockFile](tx, query, blockId)
return files, nil
})
}
func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntries []*DataCacheEntry) error {
if fileEntry == nil {
return fmt.Errorf("fileEntry or fileEntry.File is nil")
}
return WithTx(ctx, func(tx *TxWrap) error {
query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?`
if !tx.Exists(query, fileEntry.File.BlockId, fileEntry.File.Name) {
// since deletion is synchronous this stops us from writing to a deleted file
return fmt.Errorf("file not found in db")
}
if fileEntry.Dirty.Load() {
query := `UPDATE db_block_file SET size = ?, createdts = ?, modts = ?, opts = ?, meta = ? WHERE blockid = ? AND name = ?`
tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, dbutil.QuickJson(fileEntry.File.Opts), dbutil.QuickJson(fileEntry.File.Meta), fileEntry.File.BlockId, fileEntry.File.Name)
}
dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)`
for _, dataEntry := range dataEntries {
if dataEntry == nil || !dataEntry.Dirty.Load() {
continue
}
tx.Exec(dataPartQuery, fileEntry.File.BlockId, fileEntry.File.Name, dataEntry.PartIdx, dataEntry.Data)
}
if tx.Err == nil {
// clear dirty flags
fileEntry.Dirty.Store(false)
for _, dataEntry := range dataEntries {
if dataEntry != nil {
dataEntry.Dirty.Store(false)
dataEntry.Flushing.Store(false)
}
}
}
return nil
})
}

View File

@ -0,0 +1,238 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
import (
"bytes"
"context"
"testing"
"time"
"github.com/google/uuid"
)
func initDb(t *testing.T) {
t.Logf("initializing db for %q", t.Name())
useTestingDb = true
partDataSize = 50
stopFlush.Store(true)
err := InitBlockstore()
if err != nil {
t.Fatalf("error initializing blockstore: %v", err)
}
}
func cleanupDb(t *testing.T) {
t.Logf("cleaning up db for %q", t.Name())
if globalDB != nil {
globalDB.Close()
globalDB = nil
}
useTestingDb = false
partDataSize = DefaultPartDataSize
GBS.clearCache()
}
func TestCreate(t *testing.T) {
initDb(t)
defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
file, err := GBS.Stat(ctx, blockId, "testfile")
if err != nil {
t.Fatalf("error stating file: %v", err)
}
if file == nil {
t.Fatalf("file not found")
}
if file.BlockId != blockId {
t.Fatalf("block id mismatch")
}
if file.Name != "testfile" {
t.Fatalf("name mismatch")
}
if file.Size != 0 {
t.Fatalf("size mismatch")
}
if file.CreatedTs == 0 {
t.Fatalf("created ts zero")
}
if file.ModTs == 0 {
t.Fatalf("mod ts zero")
}
if file.CreatedTs != file.ModTs {
t.Fatalf("create ts != mod ts")
}
if len(file.Meta) != 0 {
t.Fatalf("meta should have no values")
}
if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 {
t.Fatalf("opts not empty")
}
}
func checkMapsEqual(t *testing.T, m1 map[string]any, m2 map[string]any, msg string) {
if len(m1) != len(m2) {
t.Errorf("%s: map length mismatch", msg)
}
for k, v := range m1 {
if m2[k] != v {
t.Errorf("%s: value mismatch for key %q", msg, k)
}
}
}
func TestSetMeta(t *testing.T) {
initDb(t)
defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
if GBS.getCacheSize() != 0 {
t.Errorf("cache size mismatch -- should have 0 entries after create")
}
err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 5, "b": "hello", "q": 8}, false)
if err != nil {
t.Fatalf("error setting meta: %v", err)
}
file, err := GBS.Stat(ctx, blockId, "testfile")
if err != nil {
t.Fatalf("error stating file: %v", err)
}
if file == nil {
t.Fatalf("file not found")
}
checkMapsEqual(t, map[string]any{"a": 5, "b": "hello", "q": 8}, file.Meta, "meta")
if GBS.getCacheSize() != 1 {
t.Errorf("cache size mismatch")
}
err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 6, "c": "world", "d": 7, "q": nil}, true)
if err != nil {
t.Fatalf("error setting meta: %v", err)
}
file, err = GBS.Stat(ctx, blockId, "testfile")
if err != nil {
t.Fatalf("error stating file: %v", err)
}
if file == nil {
t.Fatalf("file not found")
}
checkMapsEqual(t, map[string]any{"a": 6, "b": "hello", "c": "world", "d": 7}, file.Meta, "meta")
}
func checkFileSize(t *testing.T, ctx context.Context, blockId string, name string, size int64) {
file, err := GBS.Stat(ctx, blockId, name)
if err != nil {
t.Errorf("error stating file %q: %v", name, err)
return
}
if file == nil {
t.Errorf("file %q not found", name)
return
}
if file.Size != size {
t.Errorf("size mismatch for file %q: expected %d, got %d", name, size, file.Size)
}
}
func checkFileData(t *testing.T, ctx context.Context, blockId string, name string, data string) {
_, rdata, err := GBS.ReadFile(ctx, blockId, name)
if err != nil {
t.Errorf("error reading data for file %q: %v", name, err)
return
}
if string(rdata) != data {
t.Errorf("data mismatch for file %q: expected %q, got %q", name, data, string(rdata))
}
}
func checkFileDataAt(t *testing.T, ctx context.Context, blockId string, name string, offset int64, data string) {
_, rdata, err := GBS.ReadAt(ctx, blockId, name, offset, int64(len(data)))
if err != nil {
t.Errorf("error reading data for file %q: %v", name, err)
return
}
if string(rdata) != data {
t.Errorf("data mismatch for file %q: expected %q, got %q", name, data, string(rdata))
}
}
func TestAppend(t *testing.T) {
initDb(t)
defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
fileName := "t2"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.AppendData(ctx, blockId, fileName, []byte("hello"))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
// fmt.Print(GBS.dump())
checkFileSize(t, ctx, blockId, fileName, 5)
checkFileData(t, ctx, blockId, fileName, "hello")
err = GBS.AppendData(ctx, blockId, fileName, []byte(" world"))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
// fmt.Print(GBS.dump())
checkFileSize(t, ctx, blockId, fileName, 11)
checkFileData(t, ctx, blockId, fileName, "hello world")
}
func makeText(n int) string {
var buf bytes.Buffer
for i := 0; i < n; i++ {
buf.WriteByte(byte('0' + (i % 10)))
}
return buf.String()
}
func TestMultiPart(t *testing.T) {
initDb(t)
defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
fileName := "m2"
data := makeText(80)
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.AppendData(ctx, blockId, fileName, []byte(data))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
checkFileSize(t, ctx, blockId, fileName, 80)
checkFileData(t, ctx, blockId, fileName, data)
_, barr, err := GBS.ReadAt(ctx, blockId, fileName, 42, 10)
if err != nil {
t.Fatalf("error reading data: %v", err)
}
if string(barr) != data[42:52] {
t.Errorf("data mismatch: expected %q, got %q", data[42:52], string(barr))
}
GBS.WriteAt(ctx, blockId, fileName, 49, []byte("world"))
checkFileSize(t, ctx, blockId, fileName, 80)
checkFileDataAt(t, ctx, blockId, fileName, 49, "world")
checkFileDataAt(t, ctx, blockId, fileName, 48, "8world4")
}

136
pkg/blockstore/dbsetup.go Normal file
View File

@ -0,0 +1,136 @@
package blockstore
// setup for blockstore db
// includes migration support and txwrap setup
import (
"context"
"fmt"
"log"
"path"
"time"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/golang-migrate/migrate/v4"
sqlite3migrate "github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"github.com/sawka/txwrap"
dbfs "github.com/wavetermdev/thenextwave/db"
)
const BlockstoreDbName = "blockstore.db"
type TxWrap = txwrap.TxWrap
var globalDB *sqlx.DB
var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db)
func InitBlockstore() error {
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
var err error
globalDB, err = MakeDB(ctx)
if err != nil {
return err
}
err = MigrateBlockstore()
if err != nil {
return err
}
log.Printf("blockstore initialized\n")
return nil
}
func GetDBName() string {
scHome := wavebase.GetWaveHomeDir()
return path.Join(scHome, BlockstoreDbName)
}
func MakeDB(ctx context.Context) (*sqlx.DB, error) {
var rtn *sqlx.DB
var err error
if useTestingDb {
dbName := ":memory:"
log.Printf("[db] using in-memory db\n")
rtn, err = sqlx.Open("sqlite3", dbName)
} else {
dbName := GetDBName()
log.Printf("[db] opening db %s\n", dbName)
rtn, err = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName))
}
if err != nil {
return nil, fmt.Errorf("opening db: %w", err)
}
rtn.DB.SetMaxOpenConns(1)
return rtn, nil
}
func WithTx(ctx context.Context, fn func(tx *TxWrap) error) error {
return txwrap.WithTx(ctx, globalDB, fn)
}
func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (RT, error) {
return txwrap.WithTxRtn(ctx, globalDB, fn)
}
func MakeBlockstoreMigrate() (*migrate.Migrate, error) {
fsVar, err := iofs.New(dbfs.BlockstoreMigrationFS, "migrations-blockstore")
if err != nil {
return nil, fmt.Errorf("opening iofs: %w", err)
}
mdriver, err := sqlite3migrate.WithInstance(globalDB.DB, &sqlite3migrate.Config{})
if err != nil {
return nil, fmt.Errorf("making blockstore migration driver: %w", err)
}
m, err := migrate.NewWithInstance("iofs", fsVar, "sqlite3", mdriver)
if err != nil {
return nil, fmt.Errorf("making blockstore migration db[%s]: %w", GetDBName(), err)
}
return m, nil
}
func MigrateBlockstore() error {
log.Printf("migrate blockstore\n")
m, err := MakeBlockstoreMigrate()
if err != nil {
return err
}
curVersion, dirty, err := GetMigrateVersion(m)
if dirty {
return fmt.Errorf("cannot migrate up, database is dirty")
}
if err != nil {
return fmt.Errorf("cannot get current migration version: %v", err)
}
err = m.Up()
if err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("migrating blockstore: %w", err)
}
newVersion, _, err := GetMigrateVersion(m)
if err != nil {
return fmt.Errorf("cannot get new migration version: %v", err)
}
if newVersion != curVersion {
log.Printf("[db] blockstore migration done, version %d -> %d\n", curVersion, newVersion)
}
return nil
}
func GetMigrateVersion(m *migrate.Migrate) (uint, bool, error) {
if m == nil {
var err error
m, err = MakeBlockstoreMigrate()
if err != nil {
return 0, false, err
}
}
curVersion, dirty, err := m.Version()
if err == migrate.ErrNilVersion {
return 0, false, nil
}
return curVersion, dirty, err
}

90
pkg/wavebase/wavebase.go Normal file
View File

@ -0,0 +1,90 @@
package wavebase
import (
"errors"
"fmt"
"io/fs"
"os"
"path"
"strings"
"sync"
)
const DefaultWaveHome = "~/.w2"
const WaveHomeVarName = "WAVETERM_HOME"
const WaveDevVarName = "WAVETERM_DEV"
const HomeVarName = "HOME"
var baseLock = &sync.Mutex{}
var ensureDirCache = map[string]bool{}
func IsDevMode() bool {
pdev := os.Getenv(WaveDevVarName)
return pdev != ""
}
func GetHomeDir() string {
homeVar := os.Getenv(HomeVarName)
if homeVar == "" {
return "/"
}
return homeVar
}
func ExpandHomeDir(pathStr string) string {
if pathStr != "~" && !strings.HasPrefix(pathStr, "~/") {
return pathStr
}
homeDir := GetHomeDir()
if pathStr == "~" {
return homeDir
}
return path.Join(homeDir, pathStr[2:])
}
func GetWaveHomeDir() string {
homeVar := os.Getenv(WaveHomeVarName)
if homeVar != "" {
return ExpandHomeDir(homeVar)
}
return ExpandHomeDir(DefaultWaveHome)
}
func EnsureWaveHomeDir() error {
return CacheEnsureDir(GetWaveHomeDir(), "wavehome", 0700, "wave home directory")
}
func CacheEnsureDir(dirName string, cacheKey string, perm os.FileMode, dirDesc string) error {
baseLock.Lock()
ok := ensureDirCache[cacheKey]
baseLock.Unlock()
if ok {
return nil
}
err := TryMkdirs(dirName, perm, dirDesc)
if err != nil {
return err
}
baseLock.Lock()
ensureDirCache[cacheKey] = true
baseLock.Unlock()
return nil
}
func TryMkdirs(dirName string, perm os.FileMode, dirDesc string) error {
info, err := os.Stat(dirName)
if errors.Is(err, fs.ErrNotExist) {
err = os.MkdirAll(dirName, perm)
if err != nil {
return fmt.Errorf("cannot make %s %q: %w", dirDesc, dirName, err)
}
info, err = os.Stat(dirName)
}
if err != nil {
return fmt.Errorf("error trying to stat %s: %w", dirDesc, err)
}
if !info.IsDir() {
return fmt.Errorf("%s %q must be a directory", dirDesc, dirName)
}
return nil
}