s3 suggestions + widget cache (#1987)

This commit is contained in:
Mike Sawka 2025-02-18 15:15:12 -08:00 committed by GitHub
parent 7c25ebfcb1
commit da1f8dea38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 298 additions and 52 deletions

View File

@ -122,6 +122,11 @@ class RpcApiType {
return client.wshRpcCall("dispose", data, opts);
}
// command "disposesuggestions" [call]
DisposeSuggestionsCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("disposesuggestions", data, opts);
}
// command "eventpublish" [call]
EventPublishCommand(client: WshClient, data: WaveEvent, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("eventpublish", data, opts);

View File

@ -12,7 +12,7 @@ interface SuggestionControlProps {
anchorRef: React.RefObject<HTMLElement>;
isOpen: boolean;
onClose: () => void;
onSelect: (item: SuggestionType, queryStr: string) => void;
onSelect: (item: SuggestionType, queryStr: string) => boolean;
onTab?: (item: SuggestionType, queryStr: string) => string;
fetchSuggestions: SuggestionsFnType;
className?: string;
@ -256,8 +256,11 @@ const SuggestionControlInner: React.FC<SuggestionControlInnerProps> = ({
} else if (e.key === "Enter") {
e.preventDefault();
e.stopPropagation();
let suggestion: SuggestionType = null;
if (selectedIndex >= 0 && selectedIndex < suggestions.length) {
onSelect(suggestions[selectedIndex], query);
suggestion = suggestions[selectedIndex];
}
if (onSelect(suggestion, query)) {
onClose();
}
} else if (e.key === "Escape") {

View File

@ -812,6 +812,10 @@ function DirectoryPreview({ model }: DirectoryPreviewProps) {
useEffect(() => {
const filtered = unfilteredData?.filter((fileInfo) => {
if (fileInfo.name == null) {
console.log("fileInfo.name is null", fileInfo);
return false;
}
if (!showHiddenFiles && fileInfo.name.startsWith(".") && fileInfo.name != "..") {
return false;
}

View File

@ -1100,24 +1100,30 @@ const fetchSuggestions = async (
query: string,
reqContext: SuggestionRequestContext
): Promise<FetchSuggestionsResponse> => {
const conn = await globalStore.get(model.connection);
let route = makeConnRoute(conn);
if (isBlank(conn) || conn.startsWith("aws:")) {
route = null;
}
if (reqContext?.dispose) {
RpcApi.DisposeSuggestionsCommand(TabRpcClient, reqContext.widgetid, { noresponse: true, route: route });
return null;
}
const fileInfo = await globalStore.get(model.statFile);
if (fileInfo == null) {
return null;
}
const conn = await globalStore.get(model.connection);
return await RpcApi.FetchSuggestionsCommand(
TabRpcClient,
{
suggestiontype: "file",
"file:cwd": fileInfo.path,
query: query,
widgetid: reqContext.widgetid,
reqnum: reqContext.reqnum,
},
{
route: makeConnRoute(conn),
}
);
const sdata = {
suggestiontype: "file",
"file:cwd": fileInfo.path,
query: query,
widgetid: reqContext.widgetid,
reqnum: reqContext.reqnum,
"file:connection": conn,
};
return await RpcApi.FetchSuggestionsCommand(TabRpcClient, sdata, {
route: route,
});
};
function PreviewView({
@ -1135,8 +1141,18 @@ function PreviewView({
if (connStatus?.status != "connected") {
return null;
}
const handleSelect = (s: SuggestionType) => {
const handleSelect = (s: SuggestionType, queryStr: string): boolean => {
console.log("handleSelect", s, queryStr);
if (s == null) {
if (isBlank(queryStr)) {
globalStore.set(model.openFileModal, false);
return true;
}
model.handleOpenFile(queryStr);
return true;
}
model.handleOpenFile(s["file:path"]);
return true;
};
const handleTab = (s: SuggestionType, query: string): string => {
if (s["mime:type"] == "directory") {

View File

@ -616,9 +616,10 @@ const BookmarkTypeahead = memo(
onClose={() => model.setTypeaheadOpen(false)}
onSelect={(suggestion) => {
if (suggestion == null || suggestion.type != "url") {
return;
return true;
}
model.loadUrl(suggestion["url:url"], "bookmark-typeahead");
return true;
}}
fetchSuggestions={model.fetchBookmarkSuggestions}
placeholderText="Open Bookmark..."

View File

@ -4,61 +4,231 @@
package suggestion
import (
"container/list"
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"golang.org/x/sync/singleflight"
)
const ListDirChanSize = 50
// cache settings
const (
maxCacheEntries = 20
cacheTTL = 60 * time.Second
)
type cacheEntry struct {
key string
value []DirEntryResult
expiration time.Time
lruElement *list.Element
}
var (
cache = make(map[string]*cacheEntry)
cacheLRU = list.New()
cacheMu sync.Mutex
// group ensures only one listing per key is executed concurrently.
group singleflight.Group
)
func init() {
go func() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for range ticker.C {
cleanCache()
}
}()
}
func cleanCache() {
cacheMu.Lock()
defer cacheMu.Unlock()
now := time.Now()
for key, entry := range cache {
if now.After(entry.expiration) {
cacheLRU.Remove(entry.lruElement)
delete(cache, key)
}
}
}
func getCache(key string) ([]DirEntryResult, bool) {
cacheMu.Lock()
defer cacheMu.Unlock()
entry, ok := cache[key]
if !ok {
return nil, false
}
if time.Now().After(entry.expiration) {
// expired
cacheLRU.Remove(entry.lruElement)
delete(cache, key)
return nil, false
}
// update LRU order
cacheLRU.MoveToFront(entry.lruElement)
return entry.value, true
}
func setCache(key string, value []DirEntryResult) {
cacheMu.Lock()
defer cacheMu.Unlock()
// if already exists, update it
if entry, ok := cache[key]; ok {
entry.value = value
entry.expiration = time.Now().Add(cacheTTL)
cacheLRU.MoveToFront(entry.lruElement)
return
}
// evict if at capacity
if cacheLRU.Len() >= maxCacheEntries {
oldest := cacheLRU.Back()
if oldest != nil {
oldestKey := oldest.Value.(string)
if oldEntry, ok := cache[oldestKey]; ok {
cacheLRU.Remove(oldEntry.lruElement)
delete(cache, oldestKey)
}
}
}
// add new entry
elem := cacheLRU.PushFront(key)
cache[key] = &cacheEntry{
key: key,
value: value,
expiration: time.Now().Add(cacheTTL),
lruElement: elem,
}
}
// cacheDispose clears all cache entries for the provided widgetId.
func cacheDispose(widgetId string) {
cacheMu.Lock()
defer cacheMu.Unlock()
prefix := widgetId + "|"
for key, entry := range cache {
if strings.HasPrefix(key, prefix) {
cacheLRU.Remove(entry.lruElement)
delete(cache, key)
}
}
}
type DirEntryResult struct {
Entry fs.DirEntry
Err error
}
func listDirectory(ctx context.Context, dir string, maxFiles int) (<-chan DirEntryResult, error) {
// Open the directory outside the goroutine for early error reporting.
f, err := os.Open(dir)
if err != nil {
return nil, err
func listS3Directory(ctx context.Context, widgetId string, conn string, dir string, maxFiles int) (<-chan DirEntryResult, error) {
if !strings.HasPrefix(conn, "aws:") {
return nil, fmt.Errorf("invalid S3 connection: %s", conn)
}
key := widgetId + "|" + dir
if cached, ok := getCache(key); ok {
ch := make(chan DirEntryResult, ListDirChanSize)
go func() {
defer close(ch)
for _, r := range cached {
select {
case ch <- r:
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
// Ensure we have a directory.
fi, err := f.Stat()
// Ensure only one operation populates the cache for this key.
value, err, _ := group.Do(key, func() (interface{}, error) {
path := conn + ":s3://" + dir
entries, err := fileshare.ListEntries(ctx, path, &wshrpc.FileListOpts{Limit: maxFiles})
if err != nil {
return nil, err
}
var results []DirEntryResult
for _, entry := range entries {
mockEntry := &MockDirEntry{
NameStr: entry.Name,
IsDirVal: entry.IsDir,
FileMode: entry.Mode,
}
results = append(results, DirEntryResult{Entry: mockEntry})
}
return results, nil
})
if err != nil {
f.Close()
return nil, err
}
if !fi.IsDir() {
f.Close()
return nil, fmt.Errorf("%s is not a directory", dir)
}
results := value.([]DirEntryResult)
setCache(key, results)
ch := make(chan DirEntryResult, ListDirChanSize)
go func() {
defer close(ch)
// Make sure to close the directory when done.
defer f.Close()
// Read up to maxFiles entries.
entries, err := f.ReadDir(maxFiles)
if err != nil {
utilfn.SendWithCtxCheck(ctx, ch, DirEntryResult{Err: err})
return
}
// Send each entry over the channel.
for _, entry := range entries {
ok := utilfn.SendWithCtxCheck(ctx, ch, DirEntryResult{Entry: entry})
if !ok {
for _, r := range results {
select {
case ch <- r:
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
func listDirectory(ctx context.Context, widgetId string, dir string, maxFiles int) (<-chan DirEntryResult, error) {
key := widgetId + "|" + dir
if cached, ok := getCache(key); ok {
ch := make(chan DirEntryResult, ListDirChanSize)
go func() {
defer close(ch)
for _, r := range cached {
select {
case ch <- r:
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
// Use singleflight to ensure only one listing operation occurs per key.
value, err, _ := group.Do(key, func() (interface{}, error) {
f, err := os.Open(dir)
if err != nil {
return nil, err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return nil, err
}
if !fi.IsDir() {
return nil, fmt.Errorf("%s is not a directory", dir)
}
entries, err := f.ReadDir(maxFiles)
if err != nil {
return nil, err
}
var results []DirEntryResult
for _, entry := range entries {
results = append(results, DirEntryResult{Entry: entry})
}
// Add parent directory (“..”) entry if not at the filesystem root.
if filepath.Dir(dir) != dir {
mockDir := &MockDirEntry{
@ -66,7 +236,25 @@ func listDirectory(ctx context.Context, dir string, maxFiles int) (<-chan DirEnt
IsDirVal: true,
FileMode: fs.ModeDir | 0755,
}
utilfn.SendWithCtxCheck(ctx, ch, DirEntryResult{Entry: mockDir})
results = append(results, DirEntryResult{Entry: mockDir})
}
return results, nil
})
if err != nil {
return nil, err
}
results := value.([]DirEntryResult)
setCache(key, results)
ch := make(chan DirEntryResult, ListDirChanSize)
go func() {
defer close(ch)
for _, r := range results {
select {
case ch <- r:
case <-ctx.Done():
return
}
}
}()
return ch, nil

View File

@ -132,6 +132,10 @@ func resolveFileQuery(cwd string, query string) (string, string, string, error)
return cwd, "", query, nil
}
func DisposeSuggestions(ctx context.Context, widgetId string) {
cacheDispose(widgetId)
}
func FetchSuggestions(ctx context.Context, data wshrpc.FetchSuggestionsData) (*wshrpc.FetchSuggestionsResponse, error) {
if data.SuggestionType == "file" {
return fetchFileSuggestions(ctx, data)
@ -353,7 +357,7 @@ func (h *scoredEntryHeap) Pop() interface{} {
return x
}
func fetchFileSuggestions(_ context.Context, data wshrpc.FetchSuggestionsData) (*wshrpc.FetchSuggestionsResponse, error) {
func fetchFileSuggestions(ctx context.Context, data wshrpc.FetchSuggestionsData) (*wshrpc.FetchSuggestionsResponse, error) {
// Only support file suggestions.
if data.SuggestionType != "file" {
return nil, fmt.Errorf("unsupported suggestion type: %q", data.SuggestionType)
@ -366,12 +370,20 @@ func fetchFileSuggestions(_ context.Context, data wshrpc.FetchSuggestionsData) (
}
// Use a cancellable context for directory listing.
listingCtx, cancelFn := context.WithCancel(context.Background())
listingCtx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
entriesCh, err := listDirectory(listingCtx, baseDir, 1000)
if err != nil {
return nil, fmt.Errorf("error listing directory: %w", err)
var entriesCh <-chan DirEntryResult
if strings.HasPrefix(data.FileConnection, "aws:") {
entriesCh, err = listS3Directory(listingCtx, data.WidgetId, data.FileConnection, baseDir, 1000)
if err != nil {
return nil, fmt.Errorf("error listing S3 directory: %w", err)
}
} else {
entriesCh, err = listDirectory(listingCtx, data.WidgetId, baseDir, 1000)
if err != nil {
return nil, fmt.Errorf("error listing directory: %w", err)
}
}
const maxEntries = MaxSuggestions // top-k entries

View File

@ -154,6 +154,12 @@ func DisposeCommand(w *wshutil.WshRpc, data wshrpc.CommandDisposeData, opts *wsh
return err
}
// command "disposesuggestions", wshserver.DisposeSuggestionsCommand
func DisposeSuggestionsCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "disposesuggestions", data, opts)
return err
}
// command "eventpublish", wshserver.EventPublishCommand
func EventPublishCommand(w *wshutil.WshRpc, data wps.WaveEvent, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "eventpublish", data, opts)

View File

@ -868,6 +868,11 @@ func (*ServerImpl) FetchSuggestionsCommand(ctx context.Context, data wshrpc.Fetc
return suggestion.FetchSuggestions(ctx, data)
}
func (*ServerImpl) DisposeSuggestionsCommand(ctx context.Context, widgetId string) error {
suggestion.DisposeSuggestions(ctx, widgetId)
return nil
}
func logPrintfDev(format string, args ...interface{}) {
if wavebase.IsDevMode() {
log.Printf(format, args...)

View File

@ -205,6 +205,7 @@ type WshRpcInterface interface {
PathCommand(ctx context.Context, data PathCommandData) (string, error)
SendTelemetryCommand(ctx context.Context) error
FetchSuggestionsCommand(ctx context.Context, data FetchSuggestionsData) (*FetchSuggestionsResponse, error)
DisposeSuggestionsCommand(ctx context.Context, widgetId string) error
GetTabCommand(ctx context.Context, tabId string) (*waveobj.Tab, error)
// connection functions

View File

@ -961,6 +961,11 @@ func (ws *WshServer) FetchSuggestionsCommand(ctx context.Context, data wshrpc.Fe
return suggestion.FetchSuggestions(ctx, data)
}
func (ws *WshServer) DisposeSuggestionsCommand(ctx context.Context, widgetId string) error {
suggestion.DisposeSuggestions(ctx, widgetId)
return nil
}
func (ws *WshServer) GetTabCommand(ctx context.Context, tabId string) (*waveobj.Tab, error) {
tab, err := wstore.DBGet[*waveobj.Tab](ctx, tabId)
if err != nil {

View File

@ -208,7 +208,7 @@ func (router *WshRouter) sendRoutedMessage(msgBytes []byte, routeId string) bool
localRouteId := router.getAnnouncedRoute(routeId)
rpc := router.GetRpc(localRouteId)
if rpc == nil {
log.Printf("[router] no rpc for local route id %q\n", localRouteId)
log.Printf("[router] no rpc for route id %q\n", routeId)
return false
}
rpc.SendRpcMessage(msgBytes)