waveterm/cmd/main-server.go

484 lines
13 KiB
Go
Raw Normal View History

2022-06-12 22:39:48 +02:00
package main
import (
2022-07-01 21:17:19 +02:00
"context"
2022-06-12 22:39:48 +02:00
"encoding/json"
"errors"
2022-06-12 22:39:48 +02:00
"fmt"
"io/fs"
2022-06-12 22:39:48 +02:00
"net/http"
"os"
2022-07-15 10:57:45 +02:00
"runtime/debug"
2022-08-12 08:45:15 +02:00
"strconv"
2022-06-13 20:11:56 +02:00
"strings"
"sync"
2022-06-12 22:39:48 +02:00
"time"
2022-06-13 20:11:56 +02:00
"github.com/google/uuid"
2022-06-12 22:39:48 +02:00
"github.com/gorilla/mux"
"github.com/scripthaus-dev/sh2-server/pkg/cmdrunner"
"github.com/scripthaus-dev/sh2-server/pkg/remote"
"github.com/scripthaus-dev/sh2-server/pkg/scbase"
2022-07-02 22:31:56 +02:00
"github.com/scripthaus-dev/sh2-server/pkg/scpacket"
"github.com/scripthaus-dev/sh2-server/pkg/scws"
2022-06-13 20:11:56 +02:00
"github.com/scripthaus-dev/sh2-server/pkg/sstore"
2022-06-12 22:39:48 +02:00
"github.com/scripthaus-dev/sh2-server/pkg/wsshell"
)
const HttpReadTimeout = 5 * time.Second
const HttpWriteTimeout = 21 * time.Second
const HttpMaxHeaderBytes = 60000
const HttpTimeoutDuration = 21 * time.Second
const WebSocketServerAddr = "localhost:8081"
const MainServerAddr = "localhost:8080"
const WSStateReconnectTime = 30 * time.Second
const WSStatePacketChSize = 20
var GlobalLock = &sync.Mutex{}
var WSStateMap = make(map[string]*scws.WSState) // clientid -> WsState
2022-06-12 22:39:48 +02:00
func setWSState(state *scws.WSState) {
GlobalLock.Lock()
defer GlobalLock.Unlock()
WSStateMap[state.ClientId] = state
2022-06-12 22:39:48 +02:00
}
func getWSState(clientId string) *scws.WSState {
GlobalLock.Lock()
defer GlobalLock.Unlock()
return WSStateMap[clientId]
}
func removeWSStateAfterTimeout(clientId string, connectTime time.Time, waitDuration time.Duration) {
go func() {
time.Sleep(waitDuration)
GlobalLock.Lock()
defer GlobalLock.Unlock()
state := WSStateMap[clientId]
if state == nil || state.ConnectTime != connectTime {
return
}
delete(WSStateMap, clientId)
state.UnWatchScreen()
}()
}
2022-06-12 22:39:48 +02:00
func HandleWs(w http.ResponseWriter, r *http.Request) {
shell, err := wsshell.StartWS(w, r)
if err != nil {
fmt.Printf("WebSocket Upgrade Failed %T: %v\n", w, err)
2022-06-12 22:39:48 +02:00
return
}
defer shell.Conn.Close()
clientId := r.URL.Query().Get("clientid")
if clientId == "" {
close(shell.WriteChan)
return
}
state := getWSState(clientId)
if state == nil {
state = scws.MakeWSState(clientId)
state.ReplaceShell(shell)
setWSState(state)
} else {
state.UpdateConnectTime()
state.ReplaceShell(shell)
}
stateConnectTime := state.GetConnectTime()
defer func() {
removeWSStateAfterTimeout(clientId, stateConnectTime, WSStateReconnectTime)
2022-06-12 22:39:48 +02:00
}()
shell.WriteJson(map[string]interface{}{"type": "hello"}) // let client know we accepted this connection, ignore error
fmt.Printf("WebSocket opened %s %s\n", state.ClientId, shell.RemoteAddr)
state.RunWSRead()
2022-06-12 22:39:48 +02:00
}
2022-06-18 00:30:42 +02:00
// todo: sync multiple writes to the same fifoName into a single go-routine and do liveness checking on fifo
// if this returns an error, likely the fifo is dead and the cmd should be marked as 'done'
func writeToFifo(fifoName string, data []byte) error {
rwfd, err := os.OpenFile(fifoName, os.O_RDWR, 0600)
if err != nil {
return err
}
defer rwfd.Close()
fifoWriter, err := os.OpenFile(fifoName, os.O_WRONLY, 0600) // blocking open (open won't block because of rwfd)
if err != nil {
return err
}
defer fifoWriter.Close()
// this *could* block if the fifo buffer is full
// unlikely because if the reader is dead, and len(data) < pipe size, then the buffer will be empty and will clear after rwfd is closed
_, err = fifoWriter.Write(data)
if err != nil {
return err
}
return nil
}
2022-07-08 22:23:45 +02:00
// params: sessionid
2022-07-03 21:01:56 +02:00
func HandleGetSession(w http.ResponseWriter, r *http.Request) {
2022-07-02 22:31:56 +02:00
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
qvals := r.URL.Query()
2022-07-08 22:23:45 +02:00
sessionId := qvals.Get("sessionid")
if sessionId == "" {
WriteJsonError(w, fmt.Errorf("must specify a sessionid"))
2022-07-02 22:31:56 +02:00
return
}
2022-07-08 22:23:45 +02:00
session, err := sstore.GetSessionById(r.Context(), sessionId)
2022-07-02 22:31:56 +02:00
if err != nil {
WriteJsonError(w, err)
return
}
WriteJsonSuccess(w, session)
return
}
2022-07-08 22:23:45 +02:00
func HandleGetAllSessions(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
list, err := sstore.GetAllSessions(r.Context())
if err != nil {
WriteJsonError(w, fmt.Errorf("cannot get all sessions: %w", err))
return
}
WriteJsonSuccess(w, list)
return
}
2022-07-05 07:18:01 +02:00
// params: [none]
func HandleGetRemotes(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
remotes := remote.GetAllRemoteState()
WriteJsonSuccess(w, remotes)
return
}
2022-08-12 08:45:15 +02:00
// params: sessionid
func HandleGetHistory(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
qvals := r.URL.Query()
sessionId := qvals.Get("sessionid")
if _, err := uuid.Parse(sessionId); err != nil {
WriteJsonError(w, fmt.Errorf("invalid sessionid: %w", err))
return
}
numItems := 1000
numStr := qvals.Get("num")
if numStr != "" {
parsedNum, err := strconv.Atoi(numStr)
if err == nil {
numItems = parsedNum
}
}
hitems, err := sstore.GetSessionHistoryItems(r.Context(), sessionId, numItems)
if err != nil {
WriteJsonError(w, err)
return
}
rtnMap := make(map[string]interface{})
rtnMap["history"] = hitems
WriteJsonSuccess(w, rtnMap)
return
}
2022-07-02 22:31:56 +02:00
// params: sessionid, windowid
2022-07-08 22:23:45 +02:00
func HandleGetWindow(w http.ResponseWriter, r *http.Request) {
2022-07-02 22:31:56 +02:00
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
qvals := r.URL.Query()
sessionId := qvals.Get("sessionid")
windowId := qvals.Get("windowid")
if _, err := uuid.Parse(sessionId); err != nil {
WriteJsonError(w, fmt.Errorf("invalid sessionid: %w", err))
return
}
if _, err := uuid.Parse(windowId); err != nil {
WriteJsonError(w, fmt.Errorf("invalid windowid: %w", err))
return
}
2022-07-08 22:23:45 +02:00
window, err := sstore.GetWindowById(r.Context(), sessionId, windowId)
2022-07-02 22:31:56 +02:00
if err != nil {
WriteJsonError(w, err)
return
}
2022-07-08 22:23:45 +02:00
WriteJsonSuccess(w, window)
2022-07-02 22:31:56 +02:00
return
}
2022-07-03 21:01:56 +02:00
func HandleGetPtyOut(w http.ResponseWriter, r *http.Request) {
2022-06-12 22:39:48 +02:00
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
qvals := r.URL.Query()
sessionId := qvals.Get("sessionid")
cmdId := qvals.Get("cmdid")
if sessionId == "" || cmdId == "" {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("must specify sessionid and cmdid")))
return
}
if _, err := uuid.Parse(sessionId); err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("invalid sessionid: %v", err)))
return
}
if _, err := uuid.Parse(cmdId); err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("invalid cmdid: %v", err)))
return
}
_, data, err := sstore.ReadFullPtyOutFile(r.Context(), sessionId, cmdId)
2022-06-12 22:39:48 +02:00
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
w.WriteHeader(http.StatusOK)
return
}
2022-06-12 22:39:48 +02:00
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("error reading ptyout file: %v", err)))
2022-06-12 22:39:48 +02:00
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
2022-06-12 22:39:48 +02:00
}
func WriteJsonError(w http.ResponseWriter, errVal error) {
2022-06-13 20:11:56 +02:00
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
2022-06-12 22:39:48 +02:00
errMap := make(map[string]interface{})
errMap["error"] = errVal.Error()
barr, _ := json.Marshal(errMap)
w.Write(barr)
return
}
func WriteJsonSuccess(w http.ResponseWriter, data interface{}) {
2022-06-13 20:11:56 +02:00
w.Header().Set("Content-Type", "application/json")
2022-06-12 22:39:48 +02:00
rtnMap := make(map[string]interface{})
rtnMap["success"] = true
if data != nil {
rtnMap["data"] = data
}
barr, err := json.Marshal(rtnMap)
if err != nil {
WriteJsonError(w, err)
return
}
w.WriteHeader(200)
w.Write(barr)
return
}
func HandleRunCommand(w http.ResponseWriter, r *http.Request) {
2022-07-15 10:57:45 +02:00
defer func() {
r := recover()
if r == nil {
return
}
fmt.Printf("[error] in run-command: %v\n", r)
debug.PrintStack()
return
}()
2022-06-12 22:39:48 +02:00
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS")
w.Header().Set("Vary", "Origin")
w.Header().Set("Cache-Control", "no-cache")
if r.Method == "GET" || r.Method == "OPTIONS" {
w.WriteHeader(200)
return
}
decoder := json.NewDecoder(r.Body)
2022-07-02 22:31:56 +02:00
var commandPk scpacket.FeCommandPacketType
err := decoder.Decode(&commandPk)
2022-06-12 22:39:48 +02:00
if err != nil {
WriteJsonError(w, fmt.Errorf("error decoding json: %w", err))
return
}
update, err := cmdrunner.HandleCommand(r.Context(), &commandPk)
2022-07-03 21:01:56 +02:00
if err != nil {
WriteJsonError(w, err)
2022-06-13 20:11:56 +02:00
return
}
WriteJsonSuccess(w, update)
2022-07-03 21:01:56 +02:00
return
}
2022-06-12 22:39:48 +02:00
// /api/start-session
// returns:
// * userid
// * sessionid
//
// /api/ptyout (pos=[position]) - returns contents of ptyout file
// params:
// * sessionid
// * cmdid
// * pos
// returns:
// * stream of ptyout file (text, utf-8)
//
// POST /api/run-command
// params
// * userid
// * sessionid
// returns
// * cmdid
//
// /api/refresh-session
// params
// * sessionid
// * start -- can be negative
// * numlines
// returns
// * permissions (readonly, comment, command)
// * lines
// * lineid
// * ts
// * userid
// * linetype
// * text
// * cmdid
// /ws
// ->watch-session:
// * sessionid
// ->watch:
// * sessionid
// * cmdid
// ->focus:
// * sessionid
// * cmdid
// ->input:
// * sessionid
// * cmdid
// * data
// ->signal:
// * sessionid
// * cmdid
// * data
// <-data:
// * sessionid
// * cmdid
// * pos
// * data
// <-session-data:
// * sessionid
// * line
// session-doc
// timestamp | user | cmd-type | data
// cmd-type = comment
// cmd-type = command, commandid=ABC
func runWebSocketServer() {
gr := mux.NewRouter()
gr.HandleFunc("/ws", HandleWs)
server := &http.Server{
Addr: WebSocketServerAddr,
ReadTimeout: HttpReadTimeout,
WriteTimeout: HttpWriteTimeout,
MaxHeaderBytes: HttpMaxHeaderBytes,
Handler: gr,
}
server.SetKeepAlivesEnabled(false)
fmt.Printf("Running websocket server on %s\n", WebSocketServerAddr)
err := server.ListenAndServe()
if err != nil {
fmt.Printf("[error] trying to run websocket server: %v\n", err)
}
}
2022-06-12 22:39:48 +02:00
func main() {
scLock, err := scbase.AcquireSCLock()
if err != nil || scLock == nil {
fmt.Printf("[error] cannot acquire sh2 lock: %v\n", err)
return
}
if len(os.Args) >= 2 && strings.HasPrefix(os.Args[1], "--migrate") {
err := sstore.MigrateCommandOpts(os.Args[1:])
if err != nil {
fmt.Printf("[error] migrate cmd: %v\n", err)
}
return
}
err = sstore.TryMigrateUp()
if err != nil {
fmt.Printf("[error] migrate up: %v\n", err)
return
}
2022-07-01 21:17:19 +02:00
err = sstore.EnsureLocalRemote(context.Background())
if err != nil {
fmt.Printf("[error] ensuring local remote: %v\n", err)
return
}
err = sstore.AddTest01Remote(context.Background())
if err != nil {
fmt.Printf("[error] ensuring test01 remote: %v\n", err)
return
}
err = sstore.AddTest02Remote(context.Background())
if err != nil {
fmt.Printf("[error] ensuring test02 remote: %v\n", err)
return
}
_, err = sstore.EnsureDefaultSession(context.Background())
2022-07-01 23:45:33 +02:00
if err != nil {
fmt.Printf("[error] ensuring default session: %v\n", err)
return
2022-07-01 21:17:19 +02:00
}
userData, err := sstore.EnsureUserData(context.Background())
if err != nil {
fmt.Printf("[error] ensuring user data: %v\n", err)
return
}
fmt.Printf("userid = %s\n", userData.UserId)
err = remote.LoadRemotes(context.Background())
2022-06-12 22:39:48 +02:00
if err != nil {
fmt.Printf("[error] loading remotes: %v\n", err)
2022-06-12 22:39:48 +02:00
return
}
err = sstore.HangupAllRunningCmds(context.Background())
if err != nil {
fmt.Printf("[error] calling HUP on all running commands\n")
}
go runWebSocketServer()
2022-06-12 22:39:48 +02:00
gr := mux.NewRouter()
2022-07-03 21:01:56 +02:00
gr.HandleFunc("/api/ptyout", HandleGetPtyOut)
2022-07-08 22:23:45 +02:00
gr.HandleFunc("/api/get-all-sessions", HandleGetAllSessions)
gr.HandleFunc("/api/get-window", HandleGetWindow)
2022-07-05 07:18:01 +02:00
gr.HandleFunc("/api/get-remotes", HandleGetRemotes)
2022-08-12 08:45:15 +02:00
gr.HandleFunc("/api/get-history", HandleGetHistory)
2022-06-12 22:39:48 +02:00
gr.HandleFunc("/api/run-command", HandleRunCommand).Methods("GET", "POST", "OPTIONS")
server := &http.Server{
Addr: MainServerAddr,
2022-06-12 22:39:48 +02:00
ReadTimeout: HttpReadTimeout,
WriteTimeout: HttpWriteTimeout,
MaxHeaderBytes: HttpMaxHeaderBytes,
Handler: http.TimeoutHandler(gr, HttpTimeoutDuration, "Timeout"),
}
server.SetKeepAlivesEnabled(false)
fmt.Printf("Running main server on %s\n", MainServerAddr)
2022-06-12 22:39:48 +02:00
err = server.ListenAndServe()
if err != nil {
fmt.Printf("ERROR: %v\n", err)
}
}