waveterm/cmd/main-server.go
2023-03-26 18:48:43 -07:00

600 lines
16 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/fs"
"log"
"net/http"
"os"
"os/signal"
"runtime/debug"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/scripthaus-dev/sh2-server/pkg/cmdrunner"
"github.com/scripthaus-dev/sh2-server/pkg/pcloud"
"github.com/scripthaus-dev/sh2-server/pkg/remote"
"github.com/scripthaus-dev/sh2-server/pkg/rtnstate"
"github.com/scripthaus-dev/sh2-server/pkg/scbase"
"github.com/scripthaus-dev/sh2-server/pkg/scpacket"
"github.com/scripthaus-dev/sh2-server/pkg/scws"
"github.com/scripthaus-dev/sh2-server/pkg/sstore"
"github.com/scripthaus-dev/sh2-server/pkg/wsshell"
)
type WebFnType = func(http.ResponseWriter, *http.Request)
const HttpReadTimeout = 5 * time.Second
const HttpWriteTimeout = 21 * time.Second
const HttpMaxHeaderBytes = 60000
const HttpTimeoutDuration = 21 * time.Second
const MainServerAddr = "localhost:1619" // PromptServer, P=16, S=19, PS=1619
const WebSocketServerAddr = "localhost:1623" // PromptWebsock, P=16, W=23, PW=1623
const MainServerDevAddr = "localhost:8090"
const WebSocketServerDevAddr = "localhost:8091"
const WSStateReconnectTime = 30 * time.Second
const WSStatePacketChSize = 20
const InitialTelemetryWait = 30 * time.Second
const TelemetryTick = 30 * time.Minute
const TelemetryInterval = 8 * time.Hour
var GlobalLock = &sync.Mutex{}
var WSStateMap = make(map[string]*scws.WSState) // clientid -> WsState
var GlobalAuthKey string
var BuildTime = "0"
var shutdownOnce sync.Once
type ClientActiveState struct {
Fg bool `json:"fg"`
Active bool `json:"active"`
Open bool `json:"open"`
}
func setWSState(state *scws.WSState) {
GlobalLock.Lock()
defer GlobalLock.Unlock()
WSStateMap[state.ClientId] = state
}
func getWSState(clientId string) *scws.WSState {
GlobalLock.Lock()
defer GlobalLock.Unlock()
return WSStateMap[clientId]
}
func removeWSStateAfterTimeout(clientId string, connectTime time.Time, waitDuration time.Duration) {
go func() {
time.Sleep(waitDuration)
GlobalLock.Lock()
defer GlobalLock.Unlock()
state := WSStateMap[clientId]
if state == nil || state.ConnectTime != connectTime {
return
}
delete(WSStateMap, clientId)
state.UnWatchScreen()
}()
}
func HandleWs(w http.ResponseWriter, r *http.Request) {
shell, err := wsshell.StartWS(w, r)
if err != nil {
log.Printf("WebSocket Upgrade Failed %T: %v\n", w, err)
return
}
defer shell.Conn.Close()
clientId := r.URL.Query().Get("clientid")
if clientId == "" {
close(shell.WriteChan)
return
}
state := getWSState(clientId)
if state == nil {
state = scws.MakeWSState(clientId, GlobalAuthKey)
state.ReplaceShell(shell)
setWSState(state)
} else {
state.UpdateConnectTime()
state.ReplaceShell(shell)
}
stateConnectTime := state.GetConnectTime()
defer func() {
removeWSStateAfterTimeout(clientId, stateConnectTime, WSStateReconnectTime)
}()
log.Printf("WebSocket opened %s %s\n", state.ClientId, shell.RemoteAddr)
state.RunWSRead()
}
// todo: sync multiple writes to the same fifoName into a single go-routine and do liveness checking on fifo
// if this returns an error, likely the fifo is dead and the cmd should be marked as 'done'
func writeToFifo(fifoName string, data []byte) error {
rwfd, err := os.OpenFile(fifoName, os.O_RDWR, 0600)
if err != nil {
return err
}
defer rwfd.Close()
fifoWriter, err := os.OpenFile(fifoName, os.O_WRONLY, 0600) // blocking open (open won't block because of rwfd)
if err != nil {
return err
}
defer fifoWriter.Close()
// this *could* block if the fifo buffer is full
// unlikely because if the reader is dead, and len(data) < pipe size, then the buffer will be empty and will clear after rwfd is closed
_, err = fifoWriter.Write(data)
if err != nil {
return err
}
return nil
}
func HandleGetClientData(w http.ResponseWriter, r *http.Request) {
cdata, err := sstore.EnsureClientData(r.Context())
if err != nil {
WriteJsonError(w, err)
return
}
mdata, err := sstore.GetCmdMigrationInfo(r.Context())
if err != nil {
WriteJsonError(w, err)
return
}
cdata.Migration = mdata
WriteJsonSuccess(w, cdata)
return
}
func HandleSetWinSize(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var winSize sstore.ClientWinSizeType
err := decoder.Decode(&winSize)
if err != nil {
WriteJsonError(w, fmt.Errorf("error decoding json: %w", err))
return
}
err = sstore.SetWinSize(r.Context(), winSize)
if err != nil {
WriteJsonError(w, fmt.Errorf("error setting winsize: %w", err))
return
}
WriteJsonSuccess(w, true)
return
}
// params: fg, active, open
func HandleLogActiveState(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var activeState ClientActiveState
err := decoder.Decode(&activeState)
if err != nil {
WriteJsonError(w, fmt.Errorf("error decoding json: %w", err))
return
}
activity := sstore.ActivityUpdate{}
if activeState.Fg {
activity.FgMinutes = 1
}
if activeState.Active {
activity.ActiveMinutes = 1
}
if activeState.Open {
activity.OpenMinutes = 1
}
activity.NumConns = remote.NumRemotes()
err = sstore.UpdateCurrentActivity(r.Context(), activity)
if err != nil {
WriteJsonError(w, fmt.Errorf("error updating activity: %w", err))
return
}
WriteJsonSuccess(w, true)
return
}
// params: screenid
func HandleGetScreenLines(w http.ResponseWriter, r *http.Request) {
qvals := r.URL.Query()
screenId := qvals.Get("screenid")
if _, err := uuid.Parse(screenId); err != nil {
WriteJsonError(w, fmt.Errorf("invalid screenid: %w", err))
return
}
screenLines, err := sstore.GetScreenLinesById(r.Context(), screenId)
if err != nil {
WriteJsonError(w, err)
return
}
WriteJsonSuccess(w, screenLines)
return
}
func HandleRtnState(w http.ResponseWriter, r *http.Request) {
defer func() {
r := recover()
if r == nil {
return
}
log.Printf("[error] in handlertnstate: %v\n", r)
debug.PrintStack()
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("panic: %v", r)))
return
}()
qvals := r.URL.Query()
screenId := qvals.Get("screenid")
cmdId := qvals.Get("cmdid")
if screenId == "" || cmdId == "" {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("must specify screenid and cmdid")))
return
}
if _, err := uuid.Parse(screenId); err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("invalid screenid: %v", err)))
return
}
if _, err := uuid.Parse(cmdId); err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("invalid cmdid: %v", err)))
return
}
data, err := rtnstate.GetRtnStateDiff(r.Context(), screenId, cmdId)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("cannot get rtnstate diff: %v", err)))
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
return
}
func HandleRemotePty(w http.ResponseWriter, r *http.Request) {
qvals := r.URL.Query()
remoteId := qvals.Get("remoteid")
if remoteId == "" {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("must specify remoteid")))
return
}
if _, err := uuid.Parse(remoteId); err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("invalid remoteid: %v", err)))
return
}
realOffset, data, err := remote.ReadRemotePty(r.Context(), remoteId)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("error reading ptyout file: %v", err)))
return
}
w.Header().Set("X-PtyDataOffset", strconv.FormatInt(realOffset, 10))
w.WriteHeader(http.StatusOK)
w.Write(data)
return
}
func HandleGetPtyOut(w http.ResponseWriter, r *http.Request) {
qvals := r.URL.Query()
screenId := qvals.Get("screenid")
cmdId := qvals.Get("cmdid")
if screenId == "" || cmdId == "" {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("must specify screenid and cmdid")))
return
}
if _, err := uuid.Parse(screenId); err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("invalid screenid: %v", err)))
return
}
if _, err := uuid.Parse(cmdId); err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("invalid cmdid: %v", err)))
return
}
realOffset, data, err := sstore.ReadFullPtyOutFile(r.Context(), screenId, cmdId)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("error reading ptyout file: %v", err)))
return
}
w.Header().Set("X-PtyDataOffset", strconv.FormatInt(realOffset, 10))
w.WriteHeader(http.StatusOK)
w.Write(data)
}
func WriteJsonError(w http.ResponseWriter, errVal error) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
errMap := make(map[string]interface{})
errMap["error"] = errVal.Error()
barr, _ := json.Marshal(errMap)
w.Write(barr)
return
}
func WriteJsonSuccess(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
rtnMap := make(map[string]interface{})
rtnMap["success"] = true
if data != nil {
rtnMap["data"] = data
}
barr, err := json.Marshal(rtnMap)
if err != nil {
WriteJsonError(w, err)
return
}
w.WriteHeader(200)
w.Write(barr)
return
}
func HandleRunCommand(w http.ResponseWriter, r *http.Request) {
defer func() {
r := recover()
if r == nil {
return
}
log.Printf("[error] in run-command: %v\n", r)
debug.PrintStack()
WriteJsonError(w, fmt.Errorf("panic: %v", r))
return
}()
w.Header().Set("Cache-Control", "no-cache")
decoder := json.NewDecoder(r.Body)
var commandPk scpacket.FeCommandPacketType
err := decoder.Decode(&commandPk)
if err != nil {
WriteJsonError(w, fmt.Errorf("error decoding json: %w", err))
return
}
update, err := cmdrunner.HandleCommand(r.Context(), &commandPk)
if err != nil {
WriteJsonError(w, err)
return
}
WriteJsonSuccess(w, update)
return
}
func AuthKeyWrap(fn WebFnType) WebFnType {
return func(w http.ResponseWriter, r *http.Request) {
reqAuthKey := r.Header.Get("X-AuthKey")
if reqAuthKey == "" {
w.WriteHeader(500)
w.Write([]byte("no x-authkey header"))
return
}
if reqAuthKey != GlobalAuthKey {
w.WriteHeader(500)
w.Write([]byte("x-authkey header is invalid"))
return
}
w.Header().Set("Cache-Control", "no-cache")
fn(w, r)
}
}
func runWebSocketServer() {
gr := mux.NewRouter()
gr.HandleFunc("/ws", HandleWs)
serverAddr := WebSocketServerAddr
if scbase.IsDevMode() {
serverAddr = WebSocketServerDevAddr
}
server := &http.Server{
Addr: serverAddr,
ReadTimeout: HttpReadTimeout,
WriteTimeout: HttpWriteTimeout,
MaxHeaderBytes: HttpMaxHeaderBytes,
Handler: gr,
}
server.SetKeepAlivesEnabled(false)
log.Printf("Running websocket server on %s\n", serverAddr)
err := server.ListenAndServe()
if err != nil {
log.Printf("[error] trying to run websocket server: %v\n", err)
}
}
func test() error {
return nil
}
func sendTelemetryWrapper() {
defer func() {
r := recover()
if r == nil {
return
}
log.Printf("[error] in sendTelemetryWrapper: %v\n", r)
debug.PrintStack()
return
}()
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
err := pcloud.SendTelemetry(ctx, false)
if err != nil {
log.Printf("[error] sending telemetry: %v\n", err)
}
}
func telemetryLoop() {
var lastSent time.Time
time.Sleep(InitialTelemetryWait)
for {
dur := time.Now().Sub(lastSent)
if lastSent.IsZero() || dur >= TelemetryInterval {
lastSent = time.Now()
sendTelemetryWrapper()
}
time.Sleep(TelemetryTick)
}
}
// watch stdin, kill server if stdin is closed
func stdinReadWatch() {
buf := make([]byte, 1024)
for {
_, err := os.Stdin.Read(buf)
if err != nil {
doShutdown(fmt.Sprintf("stdin closed/error (%v)", err))
break
}
}
}
// ignore SIGHUP
func installSignalHandlers() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP)
go func() {
for sig := range sigCh {
doShutdown(fmt.Sprintf("got signal %v", sig))
break
}
}()
}
func doShutdown(reason string) {
shutdownOnce.Do(func() {
log.Printf("[prompt] local server %v, start shutdown\n", reason)
sendTelemetryWrapper()
log.Printf("[prompt] closing db connection\n")
sstore.CloseDB()
log.Printf("[prompt] *** shutting down local server\n")
time.Sleep(1 * time.Second)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
time.Sleep(5 * time.Second)
syscall.Kill(syscall.Getpid(), syscall.SIGKILL)
})
}
func main() {
scbase.BuildTime = BuildTime
if len(os.Args) >= 2 && os.Args[1] == "--test" {
log.Printf("running test fn\n")
err := test()
if err != nil {
log.Printf("[error] %v\n", err)
}
return
}
scHomeDir := scbase.GetPromptHomeDir()
log.Printf("[prompt] *** starting local server\n")
log.Printf("[prompt] local server version %s+%s\n", scbase.PromptVersion, scbase.BuildTime)
log.Printf("[prompt] homedir = %q\n", scHomeDir)
scLock, err := scbase.AcquirePromptLock()
if err != nil || scLock == nil {
log.Printf("[error] cannot acquire prompt lock: %v\n", err)
return
}
if len(os.Args) >= 2 && strings.HasPrefix(os.Args[1], "--migrate") {
err := sstore.MigrateCommandOpts(os.Args[1:])
if err != nil {
log.Printf("[error] migrate cmd: %v\n", err)
}
return
}
authKey, err := scbase.ReadPromptAuthKey()
if err != nil {
log.Printf("[error] %v\n", err)
return
}
GlobalAuthKey = authKey
err = sstore.TryMigrateUp()
if err != nil {
log.Printf("[error] migrate up: %v\n", err)
return
}
clientData, err := sstore.EnsureClientData(context.Background())
if err != nil {
log.Printf("[error] ensuring client data: %v\n", err)
return
}
log.Printf("userid = %s\n", clientData.UserId)
err = sstore.EnsureLocalRemote(context.Background())
if err != nil {
log.Printf("[error] ensuring local remote: %v\n", err)
return
}
_, err = sstore.EnsureDefaultSession(context.Background())
if err != nil {
log.Printf("[error] ensuring default session: %v\n", err)
return
}
err = remote.LoadRemotes(context.Background())
if err != nil {
log.Printf("[error] loading remotes: %v\n", err)
return
}
err = sstore.HangupAllRunningCmds(context.Background())
if err != nil {
log.Printf("[error] calling HUP on all running commands: %v\n", err)
}
err = sstore.ReInitFocus(context.Background())
if err != nil {
log.Printf("[error] resetting screen focus: %v\n", err)
}
log.Printf("PCLOUD_ENDPOINT=%s\n", pcloud.GetEndpoint())
err = sstore.UpdateCurrentActivity(context.Background(), sstore.ActivityUpdate{NumConns: remote.NumRemotes()}) // set at least one record into activity
if err != nil {
log.Printf("[error] updating activity: %v\n", err)
}
installSignalHandlers()
go telemetryLoop()
go stdinReadWatch()
go runWebSocketServer()
go func() {
time.Sleep(10 * time.Second)
pcloud.StartUpdateWriter()
}()
go sstore.RunCmdScreenMigration()
gr := mux.NewRouter()
gr.HandleFunc("/api/ptyout", AuthKeyWrap(HandleGetPtyOut))
gr.HandleFunc("/api/remote-pty", AuthKeyWrap(HandleRemotePty))
gr.HandleFunc("/api/rtnstate", AuthKeyWrap(HandleRtnState))
gr.HandleFunc("/api/get-screen-lines", AuthKeyWrap(HandleGetScreenLines))
gr.HandleFunc("/api/run-command", AuthKeyWrap(HandleRunCommand)).Methods("POST")
gr.HandleFunc("/api/get-client-data", AuthKeyWrap(HandleGetClientData))
gr.HandleFunc("/api/set-winsize", AuthKeyWrap(HandleSetWinSize))
gr.HandleFunc("/api/log-active-state", AuthKeyWrap(HandleLogActiveState))
serverAddr := MainServerAddr
if scbase.IsDevMode() {
serverAddr = MainServerDevAddr
}
server := &http.Server{
Addr: serverAddr,
ReadTimeout: HttpReadTimeout,
WriteTimeout: HttpWriteTimeout,
MaxHeaderBytes: HttpMaxHeaderBytes,
Handler: http.TimeoutHandler(gr, HttpTimeoutDuration, "Timeout"),
}
server.SetKeepAlivesEnabled(false)
log.Printf("Running main server on %s\n", serverAddr)
err = server.ListenAndServe()
if err != nil {
log.Printf("ERROR: %v\n", err)
}
}