diff --git a/cmd/wsh/cmd/wshcmd-server.go b/cmd/wsh/cmd/wshcmd-server.go index d0e01ccb3..003a2e955 100644 --- a/cmd/wsh/cmd/wshcmd-server.go +++ b/cmd/wsh/cmd/wshcmd-server.go @@ -4,23 +4,13 @@ package cmd import ( - "context" - "encoding/base64" - "errors" - "fmt" - "io" "os" - "path/filepath" "github.com/spf13/cobra" - "github.com/wavetermdev/thenextwave/pkg/util/utilfn" - "github.com/wavetermdev/thenextwave/pkg/wavebase" - "github.com/wavetermdev/thenextwave/pkg/wshrpc" "github.com/wavetermdev/thenextwave/pkg/wshrpc/wshclient" + "github.com/wavetermdev/thenextwave/pkg/wshrpc/wshremote" ) -const MaxFileSize = 50 * 1024 * 1024 // 10M - var serverCmd = &cobra.Command{ Use: "server", Short: "remote server to power wave blocks", @@ -28,207 +18,13 @@ var serverCmd = &cobra.Command{ Run: serverRun, } -type ServerImpl struct{} - -func (*ServerImpl) WshServerImpl() {} - -func (*ServerImpl) MessageCommand(ctx context.Context, data wshrpc.CommandMessageData) error { - WriteStderr("[message] %q\n", data.Message) - return nil -} - -func respErr(err error) wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData] { - return wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData]{Error: err} -} - -type ByteRangeType struct { - All bool - Start int64 - End int64 -} - -func parseByteRange(rangeStr string) (ByteRangeType, error) { - if rangeStr == "" { - return ByteRangeType{All: true}, nil - } - var start, end int64 - _, err := fmt.Sscanf(rangeStr, "%d-%d", &start, &end) - if err != nil { - return ByteRangeType{}, errors.New("invalid byte range") - } - if start < 0 || end < 0 || start > end { - return ByteRangeType{}, errors.New("invalid byte range") - } - return ByteRangeType{Start: start, End: end}, nil -} - -func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, byteRange ByteRangeType, dataCallback func(fileInfo *wshrpc.FileInfo, data []byte)) error { - innerFilesEntries, err := os.ReadDir(path) - if err != nil { - return fmt.Errorf("cannot open dir %q: %w", path, err) - } - if byteRange.All { - if len(innerFilesEntries) > 1000 { - innerFilesEntries = innerFilesEntries[:1000] - } - } else { - if byteRange.Start >= int64(len(innerFilesEntries)) { - return nil - } - realEnd := byteRange.End - if realEnd > int64(len(innerFilesEntries)) { - realEnd = int64(len(innerFilesEntries)) - } - innerFilesEntries = innerFilesEntries[byteRange.Start:realEnd] - } - parent := filepath.Dir(path) - parentFileInfo, err := impl.RemoteFileInfoCommand(ctx, parent) - if err == nil && parent != path { - parentFileInfo.Name = ".." - parentFileInfo.Size = -1 - dataCallback(parentFileInfo, nil) - } - for _, innerFileEntry := range innerFilesEntries { - if ctx.Err() != nil { - return ctx.Err() - } - innerFileInfoInt, err := innerFileEntry.Info() - if err != nil { - continue - } - mimeType := utilfn.DetectMimeType(filepath.Join(path, innerFileInfoInt.Name())) - var fileSize int64 - if mimeType == "directory" { - fileSize = -1 - } else { - fileSize = innerFileInfoInt.Size() - } - innerFileInfo := wshrpc.FileInfo{ - Path: filepath.Join(path, innerFileInfoInt.Name()), - Name: innerFileInfoInt.Name(), - Size: fileSize, - Mode: innerFileInfoInt.Mode(), - ModeStr: innerFileInfoInt.Mode().String(), - ModTime: innerFileInfoInt.ModTime().UnixMilli(), - IsDir: innerFileInfoInt.IsDir(), - MimeType: mimeType, - } - dataCallback(&innerFileInfo, nil) - } - return nil -} - -func (impl *ServerImpl) remoteStreamFileRegular(ctx context.Context, path string, byteRange ByteRangeType, dataCallback func(fileInfo *wshrpc.FileInfo, data []byte)) error { - fd, err := os.Open(path) - if err != nil { - return fmt.Errorf("cannot open file %q: %w", path, err) - } - defer fd.Close() - var filePos int64 - if !byteRange.All && byteRange.Start > 0 { - _, err := fd.Seek(byteRange.Start, io.SeekStart) - if err != nil { - return fmt.Errorf("seeking file %q: %w", path, err) - } - filePos = byteRange.Start - } - buf := make([]byte, 4096) - for { - if ctx.Err() != nil { - return ctx.Err() - } - n, err := fd.Read(buf) - if n > 0 { - if !byteRange.All && filePos+int64(n) > byteRange.End { - n = int(byteRange.End - filePos) - } - filePos += int64(n) - dataCallback(nil, buf[:n]) - } - if filePos >= byteRange.End { - break - } - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return fmt.Errorf("reading file %q: %w", path, err) - } - } - return nil -} - -func (impl *ServerImpl) remoteStreamFileInternal(ctx context.Context, data wshrpc.CommandRemoteStreamFileData, dataCallback func(fileInfo *wshrpc.FileInfo, data []byte)) error { - byteRange, err := parseByteRange(data.ByteRange) - if err != nil { - return err - } - path := data.Path - path = wavebase.ExpandHomeDir(path) - finfo, err := impl.RemoteFileInfoCommand(ctx, path) - if err != nil { - return fmt.Errorf("cannot stat file %q: %w", path, err) - } - dataCallback(finfo, nil) - if finfo.NotFound { - return nil - } - if finfo.Size > MaxFileSize { - return fmt.Errorf("file %q is too large to read, use /wave/stream-file", path) - } - if finfo.IsDir { - return impl.remoteStreamFileDir(ctx, path, byteRange, dataCallback) - } else { - return impl.remoteStreamFileRegular(ctx, path, byteRange, dataCallback) - } -} - -func (impl *ServerImpl) RemoteStreamFileCommand(ctx context.Context, data wshrpc.CommandRemoteStreamFileData) chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData] { - ch := make(chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData], 16) - defer close(ch) - err := impl.remoteStreamFileInternal(ctx, data, func(fileInfo *wshrpc.FileInfo, data []byte) { - resp := wshrpc.CommandRemoteStreamFileRtnData{} - resp.FileInfo = fileInfo - if len(data) > 0 { - resp.Data64 = base64.RawStdEncoding.EncodeToString(data) - } - ch <- wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData]{Response: resp} - }) - if err != nil { - ch <- respErr(err) - } - return ch -} - -func (*ServerImpl) RemoteFileInfoCommand(ctx context.Context, path string) (*wshrpc.FileInfo, error) { - cleanedPath := filepath.Clean(wavebase.ExpandHomeDir(path)) - finfo, err := os.Stat(cleanedPath) - if os.IsNotExist(err) { - return &wshrpc.FileInfo{Path: wavebase.ReplaceHomeDir(path), NotFound: true}, nil - } - if err != nil { - return nil, fmt.Errorf("cannot stat file %q: %w", path, err) - } - mimeType := utilfn.DetectMimeType(cleanedPath) - return &wshrpc.FileInfo{ - Path: cleanedPath, - Name: finfo.Name(), - Size: finfo.Size(), - Mode: finfo.Mode(), - ModeStr: finfo.Mode().String(), - ModTime: finfo.ModTime().UnixMilli(), - IsDir: finfo.IsDir(), - MimeType: mimeType, - }, nil -} - func init() { rootCmd.AddCommand(serverCmd) } func serverRun(cmd *cobra.Command, args []string) { WriteStdout("running wsh server\n") - RpcClient.SetServerImpl(&ServerImpl{}) + RpcClient.SetServerImpl(&wshremote.ServerImpl{LogWriter: os.Stdout}) err := wshclient.TestCommand(RpcClient, "hello", nil) WriteStdout("got test rtn: %v\n", err) diff --git a/frontend/app/store/wshserver.ts b/frontend/app/store/wshserver.ts index 9cce214f6..9735765da 100644 --- a/frontend/app/store/wshserver.ts +++ b/frontend/app/store/wshserver.ts @@ -97,6 +97,11 @@ class WshServerType { return WOS.wshServerRpcHelper_responsestream("remotestreamfile", data, opts); } + // command "remotewritefile" [call] + RemoteWriteFileCommand(data: CommandRemoteWriteFileData, opts?: RpcOpts): Promise { + return WOS.wshServerRpcHelper_call("remotewritefile", data, opts); + } + // command "resolveids" [call] ResolveIdsCommand(data: CommandResolveIdsData, opts?: RpcOpts): Promise { return WOS.wshServerRpcHelper_call("resolveids", data, opts); diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index e76c1bae7..f2cca14ee 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -128,6 +128,13 @@ declare global { data64?: string; }; + // wshrpc.CommandRemoteWriteFileData + type CommandRemoteWriteFileData = { + path: string; + data64: string; + createmode?: number; + }; + // wshrpc.CommandResolveIdsData type CommandResolveIdsData = { blockid: string; diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 5f6fb2535..c16760bfb 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -118,6 +118,12 @@ func RemoteStreamFileCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteStreamF return sendRpcRequestResponseStreamHelper[wshrpc.CommandRemoteStreamFileRtnData](w, "remotestreamfile", data, opts) } +// command "remotewritefile", wshserver.RemoteWriteFileCommand +func RemoteWriteFileCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteWriteFileData, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "remotewritefile", data, opts) + return err +} + // command "resolveids", wshserver.ResolveIdsCommand func ResolveIdsCommand(w *wshutil.WshRpc, data wshrpc.CommandResolveIdsData, opts *wshrpc.RpcOpts) (wshrpc.CommandResolveIdsRtnData, error) { resp, err := sendRpcRequestCallHelper[wshrpc.CommandResolveIdsRtnData](w, "resolveids", data, opts) diff --git a/pkg/wshrpc/wshremote/wshremote.go b/pkg/wshrpc/wshremote/wshremote.go new file mode 100644 index 000000000..241722929 --- /dev/null +++ b/pkg/wshrpc/wshremote/wshremote.go @@ -0,0 +1,244 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package wshremote + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "io" + "log" + "os" + "path/filepath" + + "github.com/wavetermdev/thenextwave/pkg/util/utilfn" + "github.com/wavetermdev/thenextwave/pkg/wavebase" + "github.com/wavetermdev/thenextwave/pkg/wshrpc" +) + +const MaxFileSize = 50 * 1024 * 1024 // 10M + +type ServerImpl struct { + LogWriter io.Writer +} + +func (*ServerImpl) WshServerImpl() {} + +func (impl *ServerImpl) Log(format string, args ...interface{}) { + if impl.LogWriter != nil { + fmt.Fprintf(impl.LogWriter, format, args...) + } else { + log.Printf(format, args...) + } +} + +func (impl *ServerImpl) MessageCommand(ctx context.Context, data wshrpc.CommandMessageData) error { + impl.Log("[message] %q\n", data.Message) + return nil +} + +func respErr(err error) wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData] { + return wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData]{Error: err} +} + +type ByteRangeType struct { + All bool + Start int64 + End int64 +} + +func parseByteRange(rangeStr string) (ByteRangeType, error) { + if rangeStr == "" { + return ByteRangeType{All: true}, nil + } + var start, end int64 + _, err := fmt.Sscanf(rangeStr, "%d-%d", &start, &end) + if err != nil { + return ByteRangeType{}, errors.New("invalid byte range") + } + if start < 0 || end < 0 || start > end { + return ByteRangeType{}, errors.New("invalid byte range") + } + return ByteRangeType{Start: start, End: end}, nil +} + +func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, byteRange ByteRangeType, dataCallback func(fileInfo *wshrpc.FileInfo, data []byte)) error { + innerFilesEntries, err := os.ReadDir(path) + if err != nil { + return fmt.Errorf("cannot open dir %q: %w", path, err) + } + if byteRange.All { + if len(innerFilesEntries) > 1000 { + innerFilesEntries = innerFilesEntries[:1000] + } + } else { + if byteRange.Start >= int64(len(innerFilesEntries)) { + return nil + } + realEnd := byteRange.End + if realEnd > int64(len(innerFilesEntries)) { + realEnd = int64(len(innerFilesEntries)) + } + innerFilesEntries = innerFilesEntries[byteRange.Start:realEnd] + } + parent := filepath.Dir(path) + parentFileInfo, err := impl.RemoteFileInfoCommand(ctx, parent) + if err == nil && parent != path { + parentFileInfo.Name = ".." + parentFileInfo.Size = -1 + dataCallback(parentFileInfo, nil) + } + for _, innerFileEntry := range innerFilesEntries { + if ctx.Err() != nil { + return ctx.Err() + } + innerFileInfoInt, err := innerFileEntry.Info() + if err != nil { + continue + } + mimeType := utilfn.DetectMimeType(filepath.Join(path, innerFileInfoInt.Name())) + var fileSize int64 + if mimeType == "directory" { + fileSize = -1 + } else { + fileSize = innerFileInfoInt.Size() + } + innerFileInfo := wshrpc.FileInfo{ + Path: filepath.Join(path, innerFileInfoInt.Name()), + Name: innerFileInfoInt.Name(), + Size: fileSize, + Mode: innerFileInfoInt.Mode(), + ModeStr: innerFileInfoInt.Mode().String(), + ModTime: innerFileInfoInt.ModTime().UnixMilli(), + IsDir: innerFileInfoInt.IsDir(), + MimeType: mimeType, + } + dataCallback(&innerFileInfo, nil) + } + return nil +} + +func (impl *ServerImpl) remoteStreamFileRegular(ctx context.Context, path string, byteRange ByteRangeType, dataCallback func(fileInfo *wshrpc.FileInfo, data []byte)) error { + fd, err := os.Open(path) + if err != nil { + return fmt.Errorf("cannot open file %q: %w", path, err) + } + defer fd.Close() + var filePos int64 + if !byteRange.All && byteRange.Start > 0 { + _, err := fd.Seek(byteRange.Start, io.SeekStart) + if err != nil { + return fmt.Errorf("seeking file %q: %w", path, err) + } + filePos = byteRange.Start + } + buf := make([]byte, 4096) + for { + if ctx.Err() != nil { + return ctx.Err() + } + n, err := fd.Read(buf) + if n > 0 { + if !byteRange.All && filePos+int64(n) > byteRange.End { + n = int(byteRange.End - filePos) + } + filePos += int64(n) + dataCallback(nil, buf[:n]) + } + if filePos >= byteRange.End { + break + } + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return fmt.Errorf("reading file %q: %w", path, err) + } + } + return nil +} + +func (impl *ServerImpl) remoteStreamFileInternal(ctx context.Context, data wshrpc.CommandRemoteStreamFileData, dataCallback func(fileInfo *wshrpc.FileInfo, data []byte)) error { + byteRange, err := parseByteRange(data.ByteRange) + if err != nil { + return err + } + path := data.Path + path = wavebase.ExpandHomeDir(path) + finfo, err := impl.RemoteFileInfoCommand(ctx, path) + if err != nil { + return fmt.Errorf("cannot stat file %q: %w", path, err) + } + dataCallback(finfo, nil) + if finfo.NotFound { + return nil + } + if finfo.Size > MaxFileSize { + return fmt.Errorf("file %q is too large to read, use /wave/stream-file", path) + } + if finfo.IsDir { + return impl.remoteStreamFileDir(ctx, path, byteRange, dataCallback) + } else { + return impl.remoteStreamFileRegular(ctx, path, byteRange, dataCallback) + } +} + +func (impl *ServerImpl) RemoteStreamFileCommand(ctx context.Context, data wshrpc.CommandRemoteStreamFileData) chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData] { + ch := make(chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData], 16) + defer close(ch) + err := impl.remoteStreamFileInternal(ctx, data, func(fileInfo *wshrpc.FileInfo, data []byte) { + resp := wshrpc.CommandRemoteStreamFileRtnData{} + resp.FileInfo = fileInfo + if len(data) > 0 { + resp.Data64 = base64.RawStdEncoding.EncodeToString(data) + } + ch <- wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData]{Response: resp} + }) + if err != nil { + ch <- respErr(err) + } + return ch +} + +func (*ServerImpl) RemoteFileInfoCommand(ctx context.Context, path string) (*wshrpc.FileInfo, error) { + cleanedPath := filepath.Clean(wavebase.ExpandHomeDir(path)) + finfo, err := os.Stat(cleanedPath) + if os.IsNotExist(err) { + return &wshrpc.FileInfo{Path: wavebase.ReplaceHomeDir(path), NotFound: true}, nil + } + if err != nil { + return nil, fmt.Errorf("cannot stat file %q: %w", path, err) + } + mimeType := utilfn.DetectMimeType(cleanedPath) + return &wshrpc.FileInfo{ + Path: cleanedPath, + Name: finfo.Name(), + Size: finfo.Size(), + Mode: finfo.Mode(), + ModeStr: finfo.Mode().String(), + ModTime: finfo.ModTime().UnixMilli(), + IsDir: finfo.IsDir(), + MimeType: mimeType, + }, nil +} + +func (*ServerImpl) RemoteWriteFileCommand(ctx context.Context, data wshrpc.CommandRemoteWriteFileData) error { + path := wavebase.ExpandHomeDir(data.Path) + createMode := data.CreateMode + if createMode == 0 { + createMode = 0644 + } + dataSize := base64.StdEncoding.DecodedLen(len(data.Data64)) + dataBytes := make([]byte, dataSize) + n, err := base64.StdEncoding.Decode(dataBytes, []byte(data.Data64)) + if err != nil { + return fmt.Errorf("cannot decode base64 data: %w", err) + } + err = os.WriteFile(path, dataBytes[:n], createMode) + if err != nil { + return fmt.Errorf("cannot write file %q: %w", path, err) + } + return nil +} diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index 03e8e4805..9691209f8 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -49,6 +49,7 @@ const ( Command_Test = "test" Command_RemoteStreamFile = "remotestreamfile" Command_RemoteFileInfo = "remotefileinfo" + Command_RemoteWriteFile = "remotewritefile" Command_Event = "event" ) @@ -86,6 +87,7 @@ type WshRpcInterface interface { // remotes RemoteStreamFileCommand(ctx context.Context, data CommandRemoteStreamFileData) chan RespOrErrorUnion[CommandRemoteStreamFileRtnData] RemoteFileInfoCommand(ctx context.Context, path string) (*FileInfo, error) + RemoteWriteFileCommand(ctx context.Context, data CommandRemoteWriteFileData) error } // for frontend @@ -278,3 +280,9 @@ type CommandRemoteStreamFileRtnData struct { FileInfo *FileInfo `json:"fileinfo,omitempty"` Data64 string `json:"data64,omitempty"` } + +type CommandRemoteWriteFileData struct { + Path string `json:"path"` + Data64 string `json:"data64"` + CreateMode os.FileMode `json:"createmode,omitempty"` +}