diff --git a/frontend/app/store/wshclientapi.ts b/frontend/app/store/wshclientapi.ts index a73bded77..9ca02a5dd 100644 --- a/frontend/app/store/wshclientapi.ts +++ b/frontend/app/store/wshclientapi.ts @@ -122,6 +122,11 @@ class RpcApiType { return client.wshRpcCall("dispose", data, opts); } + // command "disposesuggestions" [call] + DisposeSuggestionsCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("disposesuggestions", data, opts); + } + // command "eventpublish" [call] EventPublishCommand(client: WshClient, data: WaveEvent, opts?: RpcOpts): Promise { return client.wshRpcCall("eventpublish", data, opts); diff --git a/frontend/app/suggestion/suggestion.tsx b/frontend/app/suggestion/suggestion.tsx index 9a4e77ede..0150f380e 100644 --- a/frontend/app/suggestion/suggestion.tsx +++ b/frontend/app/suggestion/suggestion.tsx @@ -12,7 +12,7 @@ interface SuggestionControlProps { anchorRef: React.RefObject; isOpen: boolean; onClose: () => void; - onSelect: (item: SuggestionType, queryStr: string) => void; + onSelect: (item: SuggestionType, queryStr: string) => boolean; onTab?: (item: SuggestionType, queryStr: string) => string; fetchSuggestions: SuggestionsFnType; className?: string; @@ -256,8 +256,11 @@ const SuggestionControlInner: React.FC = ({ } else if (e.key === "Enter") { e.preventDefault(); e.stopPropagation(); + let suggestion: SuggestionType = null; if (selectedIndex >= 0 && selectedIndex < suggestions.length) { - onSelect(suggestions[selectedIndex], query); + suggestion = suggestions[selectedIndex]; + } + if (onSelect(suggestion, query)) { onClose(); } } else if (e.key === "Escape") { diff --git a/frontend/app/view/preview/directorypreview.tsx b/frontend/app/view/preview/directorypreview.tsx index 70f8dd9bd..6350a840a 100644 --- a/frontend/app/view/preview/directorypreview.tsx +++ b/frontend/app/view/preview/directorypreview.tsx @@ -812,6 +812,10 @@ function DirectoryPreview({ model }: DirectoryPreviewProps) { useEffect(() => { const filtered = unfilteredData?.filter((fileInfo) => { + if (fileInfo.name == null) { + console.log("fileInfo.name is null", fileInfo); + return false; + } if (!showHiddenFiles && fileInfo.name.startsWith(".") && fileInfo.name != "..") { return false; } diff --git a/frontend/app/view/preview/preview.tsx b/frontend/app/view/preview/preview.tsx index 09b961fae..afdc8cbe2 100644 --- a/frontend/app/view/preview/preview.tsx +++ b/frontend/app/view/preview/preview.tsx @@ -1100,24 +1100,30 @@ const fetchSuggestions = async ( query: string, reqContext: SuggestionRequestContext ): Promise => { + const conn = await globalStore.get(model.connection); + let route = makeConnRoute(conn); + if (isBlank(conn) || conn.startsWith("aws:")) { + route = null; + } + if (reqContext?.dispose) { + RpcApi.DisposeSuggestionsCommand(TabRpcClient, reqContext.widgetid, { noresponse: true, route: route }); + return null; + } const fileInfo = await globalStore.get(model.statFile); if (fileInfo == null) { return null; } - const conn = await globalStore.get(model.connection); - return await RpcApi.FetchSuggestionsCommand( - TabRpcClient, - { - suggestiontype: "file", - "file:cwd": fileInfo.path, - query: query, - widgetid: reqContext.widgetid, - reqnum: reqContext.reqnum, - }, - { - route: makeConnRoute(conn), - } - ); + const sdata = { + suggestiontype: "file", + "file:cwd": fileInfo.path, + query: query, + widgetid: reqContext.widgetid, + reqnum: reqContext.reqnum, + "file:connection": conn, + }; + return await RpcApi.FetchSuggestionsCommand(TabRpcClient, sdata, { + route: route, + }); }; function PreviewView({ @@ -1135,8 +1141,18 @@ function PreviewView({ if (connStatus?.status != "connected") { return null; } - const handleSelect = (s: SuggestionType) => { + const handleSelect = (s: SuggestionType, queryStr: string): boolean => { + console.log("handleSelect", s, queryStr); + if (s == null) { + if (isBlank(queryStr)) { + globalStore.set(model.openFileModal, false); + return true; + } + model.handleOpenFile(queryStr); + return true; + } model.handleOpenFile(s["file:path"]); + return true; }; const handleTab = (s: SuggestionType, query: string): string => { if (s["mime:type"] == "directory") { diff --git a/frontend/app/view/webview/webview.tsx b/frontend/app/view/webview/webview.tsx index 4d280c058..2a691ebb4 100644 --- a/frontend/app/view/webview/webview.tsx +++ b/frontend/app/view/webview/webview.tsx @@ -616,9 +616,10 @@ const BookmarkTypeahead = memo( onClose={() => model.setTypeaheadOpen(false)} onSelect={(suggestion) => { if (suggestion == null || suggestion.type != "url") { - return; + return true; } model.loadUrl(suggestion["url:url"], "bookmark-typeahead"); + return true; }} fetchSuggestions={model.fetchBookmarkSuggestions} placeholderText="Open Bookmark..." diff --git a/pkg/suggestion/filewalk.go b/pkg/suggestion/filewalk.go index 983e4c0e7..d31b728f3 100644 --- a/pkg/suggestion/filewalk.go +++ b/pkg/suggestion/filewalk.go @@ -4,61 +4,231 @@ package suggestion import ( + "container/list" "context" "fmt" "io/fs" "os" "path/filepath" + "strings" + "sync" + "time" - "github.com/wavetermdev/waveterm/pkg/util/utilfn" + "github.com/wavetermdev/waveterm/pkg/remote/fileshare" + "github.com/wavetermdev/waveterm/pkg/wshrpc" + "golang.org/x/sync/singleflight" ) const ListDirChanSize = 50 +// cache settings +const ( + maxCacheEntries = 20 + cacheTTL = 60 * time.Second +) + +type cacheEntry struct { + key string + value []DirEntryResult + expiration time.Time + lruElement *list.Element +} + +var ( + cache = make(map[string]*cacheEntry) + cacheLRU = list.New() + cacheMu sync.Mutex + + // group ensures only one listing per key is executed concurrently. + group singleflight.Group +) + +func init() { + go func() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for range ticker.C { + cleanCache() + } + }() +} + +func cleanCache() { + cacheMu.Lock() + defer cacheMu.Unlock() + now := time.Now() + for key, entry := range cache { + if now.After(entry.expiration) { + cacheLRU.Remove(entry.lruElement) + delete(cache, key) + } + } +} + +func getCache(key string) ([]DirEntryResult, bool) { + cacheMu.Lock() + defer cacheMu.Unlock() + entry, ok := cache[key] + if !ok { + return nil, false + } + if time.Now().After(entry.expiration) { + // expired + cacheLRU.Remove(entry.lruElement) + delete(cache, key) + return nil, false + } + // update LRU order + cacheLRU.MoveToFront(entry.lruElement) + return entry.value, true +} + +func setCache(key string, value []DirEntryResult) { + cacheMu.Lock() + defer cacheMu.Unlock() + // if already exists, update it + if entry, ok := cache[key]; ok { + entry.value = value + entry.expiration = time.Now().Add(cacheTTL) + cacheLRU.MoveToFront(entry.lruElement) + return + } + // evict if at capacity + if cacheLRU.Len() >= maxCacheEntries { + oldest := cacheLRU.Back() + if oldest != nil { + oldestKey := oldest.Value.(string) + if oldEntry, ok := cache[oldestKey]; ok { + cacheLRU.Remove(oldEntry.lruElement) + delete(cache, oldestKey) + } + } + } + // add new entry + elem := cacheLRU.PushFront(key) + cache[key] = &cacheEntry{ + key: key, + value: value, + expiration: time.Now().Add(cacheTTL), + lruElement: elem, + } +} + +// cacheDispose clears all cache entries for the provided widgetId. +func cacheDispose(widgetId string) { + cacheMu.Lock() + defer cacheMu.Unlock() + prefix := widgetId + "|" + for key, entry := range cache { + if strings.HasPrefix(key, prefix) { + cacheLRU.Remove(entry.lruElement) + delete(cache, key) + } + } +} + type DirEntryResult struct { Entry fs.DirEntry Err error } -func listDirectory(ctx context.Context, dir string, maxFiles int) (<-chan DirEntryResult, error) { - // Open the directory outside the goroutine for early error reporting. - f, err := os.Open(dir) - if err != nil { - return nil, err +func listS3Directory(ctx context.Context, widgetId string, conn string, dir string, maxFiles int) (<-chan DirEntryResult, error) { + if !strings.HasPrefix(conn, "aws:") { + return nil, fmt.Errorf("invalid S3 connection: %s", conn) + } + key := widgetId + "|" + dir + if cached, ok := getCache(key); ok { + ch := make(chan DirEntryResult, ListDirChanSize) + go func() { + defer close(ch) + for _, r := range cached { + select { + case ch <- r: + case <-ctx.Done(): + return + } + } + }() + return ch, nil } - // Ensure we have a directory. - fi, err := f.Stat() + // Ensure only one operation populates the cache for this key. + value, err, _ := group.Do(key, func() (interface{}, error) { + path := conn + ":s3://" + dir + entries, err := fileshare.ListEntries(ctx, path, &wshrpc.FileListOpts{Limit: maxFiles}) + if err != nil { + return nil, err + } + var results []DirEntryResult + for _, entry := range entries { + mockEntry := &MockDirEntry{ + NameStr: entry.Name, + IsDirVal: entry.IsDir, + FileMode: entry.Mode, + } + results = append(results, DirEntryResult{Entry: mockEntry}) + } + return results, nil + }) if err != nil { - f.Close() return nil, err } - if !fi.IsDir() { - f.Close() - return nil, fmt.Errorf("%s is not a directory", dir) - } + results := value.([]DirEntryResult) + setCache(key, results) ch := make(chan DirEntryResult, ListDirChanSize) go func() { defer close(ch) - // Make sure to close the directory when done. - defer f.Close() - - // Read up to maxFiles entries. - entries, err := f.ReadDir(maxFiles) - if err != nil { - utilfn.SendWithCtxCheck(ctx, ch, DirEntryResult{Err: err}) - return - } - - // Send each entry over the channel. - for _, entry := range entries { - ok := utilfn.SendWithCtxCheck(ctx, ch, DirEntryResult{Entry: entry}) - if !ok { + for _, r := range results { + select { + case ch <- r: + case <-ctx.Done(): return } } + }() + return ch, nil +} +func listDirectory(ctx context.Context, widgetId string, dir string, maxFiles int) (<-chan DirEntryResult, error) { + key := widgetId + "|" + dir + if cached, ok := getCache(key); ok { + ch := make(chan DirEntryResult, ListDirChanSize) + go func() { + defer close(ch) + for _, r := range cached { + select { + case ch <- r: + case <-ctx.Done(): + return + } + } + }() + return ch, nil + } + + // Use singleflight to ensure only one listing operation occurs per key. + value, err, _ := group.Do(key, func() (interface{}, error) { + f, err := os.Open(dir) + if err != nil { + return nil, err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return nil, err + } + if !fi.IsDir() { + return nil, fmt.Errorf("%s is not a directory", dir) + } + entries, err := f.ReadDir(maxFiles) + if err != nil { + return nil, err + } + var results []DirEntryResult + for _, entry := range entries { + results = append(results, DirEntryResult{Entry: entry}) + } // Add parent directory (“..”) entry if not at the filesystem root. if filepath.Dir(dir) != dir { mockDir := &MockDirEntry{ @@ -66,7 +236,25 @@ func listDirectory(ctx context.Context, dir string, maxFiles int) (<-chan DirEnt IsDirVal: true, FileMode: fs.ModeDir | 0755, } - utilfn.SendWithCtxCheck(ctx, ch, DirEntryResult{Entry: mockDir}) + results = append(results, DirEntryResult{Entry: mockDir}) + } + return results, nil + }) + if err != nil { + return nil, err + } + results := value.([]DirEntryResult) + setCache(key, results) + + ch := make(chan DirEntryResult, ListDirChanSize) + go func() { + defer close(ch) + for _, r := range results { + select { + case ch <- r: + case <-ctx.Done(): + return + } } }() return ch, nil diff --git a/pkg/suggestion/suggestion.go b/pkg/suggestion/suggestion.go index 157cf587c..1aa727797 100644 --- a/pkg/suggestion/suggestion.go +++ b/pkg/suggestion/suggestion.go @@ -132,6 +132,10 @@ func resolveFileQuery(cwd string, query string) (string, string, string, error) return cwd, "", query, nil } +func DisposeSuggestions(ctx context.Context, widgetId string) { + cacheDispose(widgetId) +} + func FetchSuggestions(ctx context.Context, data wshrpc.FetchSuggestionsData) (*wshrpc.FetchSuggestionsResponse, error) { if data.SuggestionType == "file" { return fetchFileSuggestions(ctx, data) @@ -353,7 +357,7 @@ func (h *scoredEntryHeap) Pop() interface{} { return x } -func fetchFileSuggestions(_ context.Context, data wshrpc.FetchSuggestionsData) (*wshrpc.FetchSuggestionsResponse, error) { +func fetchFileSuggestions(ctx context.Context, data wshrpc.FetchSuggestionsData) (*wshrpc.FetchSuggestionsResponse, error) { // Only support file suggestions. if data.SuggestionType != "file" { return nil, fmt.Errorf("unsupported suggestion type: %q", data.SuggestionType) @@ -366,12 +370,20 @@ func fetchFileSuggestions(_ context.Context, data wshrpc.FetchSuggestionsData) ( } // Use a cancellable context for directory listing. - listingCtx, cancelFn := context.WithCancel(context.Background()) + listingCtx, cancelFn := context.WithCancel(ctx) defer cancelFn() - entriesCh, err := listDirectory(listingCtx, baseDir, 1000) - if err != nil { - return nil, fmt.Errorf("error listing directory: %w", err) + var entriesCh <-chan DirEntryResult + if strings.HasPrefix(data.FileConnection, "aws:") { + entriesCh, err = listS3Directory(listingCtx, data.WidgetId, data.FileConnection, baseDir, 1000) + if err != nil { + return nil, fmt.Errorf("error listing S3 directory: %w", err) + } + } else { + entriesCh, err = listDirectory(listingCtx, data.WidgetId, baseDir, 1000) + if err != nil { + return nil, fmt.Errorf("error listing directory: %w", err) + } } const maxEntries = MaxSuggestions // top-k entries diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 9e1a97af2..1ded90d83 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -154,6 +154,12 @@ func DisposeCommand(w *wshutil.WshRpc, data wshrpc.CommandDisposeData, opts *wsh return err } +// command "disposesuggestions", wshserver.DisposeSuggestionsCommand +func DisposeSuggestionsCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "disposesuggestions", data, opts) + return err +} + // command "eventpublish", wshserver.EventPublishCommand func EventPublishCommand(w *wshutil.WshRpc, data wps.WaveEvent, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "eventpublish", data, opts) diff --git a/pkg/wshrpc/wshremote/wshremote.go b/pkg/wshrpc/wshremote/wshremote.go index 9d0e0a718..e73703127 100644 --- a/pkg/wshrpc/wshremote/wshremote.go +++ b/pkg/wshrpc/wshremote/wshremote.go @@ -868,6 +868,11 @@ func (*ServerImpl) FetchSuggestionsCommand(ctx context.Context, data wshrpc.Fetc return suggestion.FetchSuggestions(ctx, data) } +func (*ServerImpl) DisposeSuggestionsCommand(ctx context.Context, widgetId string) error { + suggestion.DisposeSuggestions(ctx, widgetId) + return nil +} + func logPrintfDev(format string, args ...interface{}) { if wavebase.IsDevMode() { log.Printf(format, args...) diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index 5d5e3ec52..cc4ef1e14 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -205,6 +205,7 @@ type WshRpcInterface interface { PathCommand(ctx context.Context, data PathCommandData) (string, error) SendTelemetryCommand(ctx context.Context) error FetchSuggestionsCommand(ctx context.Context, data FetchSuggestionsData) (*FetchSuggestionsResponse, error) + DisposeSuggestionsCommand(ctx context.Context, widgetId string) error GetTabCommand(ctx context.Context, tabId string) (*waveobj.Tab, error) // connection functions diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index 3a2e8768f..70029392b 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -961,6 +961,11 @@ func (ws *WshServer) FetchSuggestionsCommand(ctx context.Context, data wshrpc.Fe return suggestion.FetchSuggestions(ctx, data) } +func (ws *WshServer) DisposeSuggestionsCommand(ctx context.Context, widgetId string) error { + suggestion.DisposeSuggestions(ctx, widgetId) + return nil +} + func (ws *WshServer) GetTabCommand(ctx context.Context, tabId string) (*waveobj.Tab, error) { tab, err := wstore.DBGet[*waveobj.Tab](ctx, tabId) if err != nil { diff --git a/pkg/wshutil/wshrouter.go b/pkg/wshutil/wshrouter.go index 0e232a918..d4bb92c84 100644 --- a/pkg/wshutil/wshrouter.go +++ b/pkg/wshutil/wshrouter.go @@ -208,7 +208,7 @@ func (router *WshRouter) sendRoutedMessage(msgBytes []byte, routeId string) bool localRouteId := router.getAnnouncedRoute(routeId) rpc := router.GetRpc(localRouteId) if rpc == nil { - log.Printf("[router] no rpc for local route id %q\n", localRouteId) + log.Printf("[router] no rpc for route id %q\n", routeId) return false } rpc.SendRpcMessage(msgBytes)