on ws connect, send sessionlist and remotelist

This commit is contained in:
sawka 2022-09-05 12:42:09 -07:00
parent 74953c71ac
commit b980fd6b74
5 changed files with 76 additions and 150 deletions

View File

@ -90,7 +90,6 @@ func HandleWs(w http.ResponseWriter, r *http.Request) {
defer func() {
removeWSStateAfterTimeout(clientId, stateConnectTime, WSStateReconnectTime)
}()
shell.WriteJson(map[string]interface{}{"type": "hello"}) // let client know we accepted this connection, ignore error
fmt.Printf("WebSocket opened %s %s\n", state.ClientId, shell.RemoteAddr)
state.RunWSRead()
}
@ -117,57 +116,6 @@ func writeToFifo(fifoName string, data []byte) error {
return nil
}
// params: sessionid
func HandleGetSession(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
qvals := r.URL.Query()
sessionId := qvals.Get("sessionid")
if sessionId == "" {
WriteJsonError(w, fmt.Errorf("must specify a sessionid"))
return
}
session, err := sstore.GetSessionById(r.Context(), sessionId)
if err != nil {
WriteJsonError(w, err)
return
}
WriteJsonSuccess(w, session)
return
}
func HandleGetAllSessions(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
list, err := sstore.GetAllSessions(r.Context())
if err != nil {
WriteJsonError(w, fmt.Errorf("cannot get all sessions: %w", err))
return
}
WriteJsonSuccess(w, list)
return
}
// params: [none]
func HandleGetRemotes(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
remotes := remote.GetAllRemoteRuntimeState()
ifarr := make([]interface{}, len(remotes))
for idx, r := range remotes {
ifarr[idx] = r
}
update := sstore.ModelUpdate{Remotes: ifarr}
WriteJsonSuccess(w, update)
return
}
// params: sessionid, windowid
func HandleGetWindow(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
@ -295,72 +243,6 @@ func HandleRunCommand(w http.ResponseWriter, r *http.Request) {
return
}
// /api/start-session
// returns:
// * userid
// * sessionid
//
// /api/ptyout (pos=[position]) - returns contents of ptyout file
// params:
// * sessionid
// * cmdid
// * pos
// returns:
// * stream of ptyout file (text, utf-8)
//
// POST /api/run-command
// params
// * userid
// * sessionid
// returns
// * cmdid
//
// /api/refresh-session
// params
// * sessionid
// * start -- can be negative
// * numlines
// returns
// * permissions (readonly, comment, command)
// * lines
// * lineid
// * ts
// * userid
// * linetype
// * text
// * cmdid
// /ws
// ->watch-session:
// * sessionid
// ->watch:
// * sessionid
// * cmdid
// ->focus:
// * sessionid
// * cmdid
// ->input:
// * sessionid
// * cmdid
// * data
// ->signal:
// * sessionid
// * cmdid
// * data
// <-data:
// * sessionid
// * cmdid
// * pos
// * data
// <-session-data:
// * sessionid
// * line
// session-doc
// timestamp | user | cmd-type | data
// cmd-type = comment
// cmd-type = command, commandid=ABC
func runWebSocketServer() {
gr := mux.NewRouter()
gr.HandleFunc("/ws", HandleWs)
@ -438,9 +320,7 @@ func main() {
go runWebSocketServer()
gr := mux.NewRouter()
gr.HandleFunc("/api/ptyout", HandleGetPtyOut)
gr.HandleFunc("/api/get-all-sessions", HandleGetAllSessions)
gr.HandleFunc("/api/get-window", HandleGetWindow)
gr.HandleFunc("/api/get-remotes", HandleGetRemotes)
gr.HandleFunc("/api/run-command", HandleRunCommand).Methods("GET", "POST", "OPTIONS")
server := &http.Server{
Addr: MainServerAddr,

View File

@ -13,12 +13,13 @@ const WatchScreenPacketStr = "watchscreen"
const FeInputPacketStr = "feinput"
type FeCommandPacketType struct {
Type string `json:"type"`
MetaCmd string `json:"metacmd"`
MetaSubCmd string `json:"metasubcmd,omitempty"`
Args []string `json:"args,omitempty"`
Kwargs map[string]string `json:"kwargs,omitempty"`
UIContext *UIContextType `json:"uicontext,omitempty"`
Type string `json:"type"`
MetaCmd string `json:"metacmd"`
MetaSubCmd string `json:"metasubcmd,omitempty"`
Args []string `json:"args,omitempty"`
Kwargs map[string]string `json:"kwargs,omitempty"`
UIContext *UIContextType `json:"uicontext,omitempty"`
Interactive bool `json:"interactive"`
}
type UIContextType struct {
@ -39,6 +40,13 @@ type FeInputPacketType struct {
WinSizeCols int `json:"winsizecols"`
}
type WatchScreenPacketType struct {
Type string `json:"type"`
SessionId string `json:"sessionid"`
ScreenId string `json:"screenid"`
Connect bool `json:"connect"`
}
func init() {
packet.RegisterPacketType(FeCommandPacketStr, reflect.TypeOf(FeCommandPacketType{}))
packet.RegisterPacketType(WatchScreenPacketStr, reflect.TypeOf(WatchScreenPacketType{}))
@ -72,12 +80,6 @@ func (p *FeInputPacketType) ConvertToInputPacket() *packet.InputPacketType {
return rtn
}
type WatchScreenPacketType struct {
Type string `json:"type"`
SessionId string `json:"sessionid"`
ScreenId string `json:"screenid"`
}
func (*WatchScreenPacketType) GetType() string {
return WatchScreenPacketStr
}

View File

@ -1,6 +1,7 @@
package scws
import (
"context"
"fmt"
"sync"
"time"
@ -75,7 +76,7 @@ func (ws *WSState) WatchScreen(sessionId string, screenId string) {
ws.SessionId = sessionId
ws.ScreenId = screenId
ws.UpdateCh = sstore.MainBus.RegisterChannel(ws.ClientId, ws.SessionId)
go ws.RunUpdates()
go ws.RunUpdates(ws.UpdateCh)
}
func (ws *WSState) UnWatchScreen() {
@ -84,6 +85,7 @@ func (ws *WSState) UnWatchScreen() {
sstore.MainBus.UnregisterChannel(ws.ClientId)
ws.SessionId = ""
ws.ScreenId = ""
fmt.Printf("[ws] unwatch screen clientid=%s\n", ws.ClientId)
}
func (ws *WSState) getUpdateCh() chan interface{} {
@ -92,10 +94,9 @@ func (ws *WSState) getUpdateCh() chan interface{} {
return ws.UpdateCh
}
func (ws *WSState) RunUpdates() {
updateCh := ws.getUpdateCh()
func (ws *WSState) RunUpdates(updateCh chan interface{}) {
if updateCh == nil {
return
panic("invalid nil updateCh passed to RunUpdates")
}
for update := range updateCh {
shell := ws.GetShell()
@ -117,11 +118,60 @@ func (ws *WSState) ReplaceShell(shell *wsshell.WSShell) {
return
}
func (ws *WSState) handleConnection() error {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
update, err := sstore.GetAllSessions(ctx)
if err != nil {
return fmt.Errorf("getting sessions: %w", err)
}
remotes := remote.GetAllRemoteRuntimeState()
ifarr := make([]interface{}, len(remotes))
for idx, r := range remotes {
ifarr[idx] = r
}
update.Remotes = ifarr
update.Connect = true
err = ws.Shell.WriteJson(update)
if err != nil {
return err
}
return nil
}
func (ws *WSState) handleWatchScreen(wsPk *scpacket.WatchScreenPacketType) error {
if wsPk.SessionId != "" {
if _, err := uuid.Parse(wsPk.SessionId); err != nil {
return fmt.Errorf("invalid watchscreen sessionid: %w", err)
}
}
if wsPk.ScreenId != "" {
if _, err := uuid.Parse(wsPk.ScreenId); err != nil {
return fmt.Errorf("invalid watchscreen screenid: %w", err)
}
}
if wsPk.SessionId == "" || wsPk.ScreenId == "" {
ws.UnWatchScreen()
} else {
ws.WatchScreen(wsPk.SessionId, wsPk.ScreenId)
fmt.Printf("[ws %s] watchscreen %s/%s\n", ws.ClientId, wsPk.SessionId, wsPk.ScreenId)
}
if wsPk.Connect {
fmt.Printf("[ws %s] watchscreen connect\n", ws.ClientId)
err := ws.handleConnection()
if err != nil {
return fmt.Errorf("connect: %w", err)
}
}
return nil
}
func (ws *WSState) RunWSRead() {
shell := ws.GetShell()
if shell == nil {
return
}
shell.WriteJson(map[string]interface{}{"type": "hello"}) // let client know we accepted this connection, ignore error
for msgBytes := range shell.ReadChan {
pk, err := packet.ParseJsonPacket(msgBytes)
if err != nil {
@ -149,20 +199,11 @@ func (ws *WSState) RunWSRead() {
}
if pk.GetType() == "watchscreen" {
wsPk := pk.(*scpacket.WatchScreenPacketType)
if _, err := uuid.Parse(wsPk.SessionId); err != nil {
fmt.Printf("[error] invalid watchscreen sessionid: %v\n", err)
continue
err := ws.handleWatchScreen(wsPk)
if err != nil {
// TODO send errors back to client, likely unrecoverable
fmt.Printf("[ws %s] error %v\n", err)
}
if wsPk.ScreenId == "" {
ws.UnWatchScreen()
continue
}
if _, err := uuid.Parse(wsPk.ScreenId); err != nil {
fmt.Printf("[error] invalid watchscreen screenid: %v\n", err)
continue
}
ws.WatchScreen(wsPk.SessionId, wsPk.ScreenId)
fmt.Printf("[ws] watch screen clientid=%s %s/%s\n", ws.ClientId, wsPk.SessionId, wsPk.ScreenId)
continue
}
fmt.Printf("got ws bad message: %v\n", pk.GetType())

View File

@ -36,6 +36,8 @@ type ModelUpdate struct {
Info *InfoMsgType `json:"info,omitempty"`
Remotes []interface{} `json:"remotes,omitempty"` // []*remote.RemoteState
History *HistoryInfoType `json:"history,omitempty"`
Interactive bool `json:"interactive"`
Connect bool `json:"connect,omitempty"`
}
func (ModelUpdate) UpdateType() string {
@ -123,6 +125,7 @@ func MakeUpdateBus() *UpdateBus {
}
}
// always returns a new channel
func (bus *UpdateBus) RegisterChannel(clientId string, sessionId string) chan interface{} {
bus.Lock.Lock()
defer bus.Lock.Unlock()

View File

@ -20,7 +20,7 @@ const initialPingTime = 1 * time.Second
var upgrader = websocket.Upgrader{
ReadBufferSize: 4 * 1024,
WriteBufferSize: 4 * 1024,
WriteBufferSize: 32 * 1024,
HandshakeTimeout: 1 * time.Second,
CheckOrigin: func(r *http.Request) bool { return true },
}