better way to handle updates -- use a dedup across all updates, pack as many updates as possible into the packet

This commit is contained in:
sawka 2023-03-31 18:15:51 -07:00
parent 5e5a6aa53f
commit f808c7b362
4 changed files with 154 additions and 42 deletions

View File

@ -12,6 +12,11 @@ type DBMappable interface {
UseDBMap()
}
type MapEntry[T any] struct {
Key string
Val T
}
type MapConverter interface {
ToMap() map[string]interface{}
FromMap(map[string]interface{}) bool
@ -88,6 +93,19 @@ func SelectMapsGen[PT MapConverterPtr[T], T any](tx *txwrap.TxWrap, query string
return rtn
}
func SelectSimpleMap[T any](tx *txwrap.TxWrap, query string, args ...interface{}) map[string]T {
var rtn []MapEntry[T]
tx.Select(&rtn, query, args...)
if len(rtn) == 0 {
return nil
}
rtnMap := make(map[string]T)
for _, entry := range rtn {
rtnMap[entry.Key] = entry.Val
}
return rtnMap
}
func MakeGenMap[T HasSimpleKey](arr []T) map[string]T {
rtn := make(map[string]T)
for _, val := range arr {

View File

@ -26,8 +26,14 @@ const PCloudEndpointVarName = "PCLOUD_ENDPOINT"
const APIVersion = 1
const MaxPtyUpdateSize = (128 * 1024)
const MaxUpdatesPerReq = 10
const MaxUpdatesToDeDup = 1000
const MaxUpdateWriterErrors = 3
const PCloudDefaultTimeout = 5 * time.Second
const PCloudWebShareUpdateTimeout = 15 * time.Second
// setting to 1M to be safe (max is 6M for API-GW + Lambda, but there is base64 encoding and upload time)
// we allow one extra update past this estimated size
const MaxUpdatePayloadSize = 1 * (1024 * 1024)
const TelemetryUrl = "/telemetry"
const NoTelemetryUrl = "/no-telemetry"
@ -366,27 +372,20 @@ type webShareResponseType struct {
Data []*WebShareUpdateResponseType `json:"data"`
}
func convertUpdates(updateArr []*sstore.ScreenUpdateType) []*WebShareUpdateType {
var webUpdates []*WebShareUpdateType
for _, update := range updateArr {
webUpdate, err := makeWebShareUpdate(context.Background(), update)
if err != nil || webUpdate == nil {
// log error (if there is one), remove update, and continue
if err != nil {
log.Printf("[pcloud] error create web-share update updateid:%d: %v", update.UpdateId, err)
}
if update.UpdateType == sstore.UpdateType_PtyPos {
err = sstore.RemoveScreenUpdate(context.Background(), update.UpdateId)
}
if err != nil {
// ignore this error too (although this is really problematic, there is nothing to do)
log.Printf("[pcloud] error removing screen update updateid:%d: %v", update.UpdateId, err)
}
continue
func convertUpdate(update *sstore.ScreenUpdateType) *WebShareUpdateType {
webUpdate, err := makeWebShareUpdate(context.Background(), update)
if err != nil || webUpdate == nil {
if err != nil {
log.Printf("[pcloud] error create web-share update updateid:%d: %v", update.UpdateId, err)
}
// if err, or no web update created, remove the screenupdate
removeErr := sstore.RemoveScreenUpdate(context.Background(), update.UpdateId)
if removeErr != nil {
// ignore this error too (although this is really problematic, there is nothing to do)
log.Printf("[pcloud] error removing screen update updateid:%d: %v", update.UpdateId, removeErr)
}
webUpdates = append(webUpdates, webUpdate)
}
return webUpdates
return webUpdate
}
func DoSyncWebUpdate(webUpdate *WebShareUpdateType) error {
@ -423,7 +422,7 @@ func DoWebUpdates(webUpdates []*WebShareUpdateType) error {
if err != nil {
return fmt.Errorf("could not get authinfo for request: %v", err)
}
ctx, cancelFn := context.WithTimeout(context.Background(), PCloudDefaultTimeout)
ctx, cancelFn := context.WithTimeout(context.Background(), PCloudWebShareUpdateTimeout)
defer cancelFn()
req, err := makeAuthPostReq(ctx, WebShareUpdateUrl, authInfo, webUpdates)
if err != nil {
@ -477,9 +476,9 @@ func StartUpdateWriter() {
func computeBackoff(numFailures int) time.Duration {
switch numFailures {
case 1:
return 100 * time.Millisecond
return 500 * time.Millisecond
case 2:
return 1 * time.Second
return 2 * time.Second
case 3:
return 5 * time.Second
case 4:
@ -493,6 +492,34 @@ func computeBackoff(numFailures int) time.Duration {
}
}
type updateKey struct {
ScreenId string
LineId string
UpdateType string
}
func DeDupUpdates(ctx context.Context, updateArr []*sstore.ScreenUpdateType) ([]*sstore.ScreenUpdateType, error) {
var rtn []*sstore.ScreenUpdateType
var idsToDelete []int64
umap := make(map[updateKey]bool)
for _, update := range updateArr {
key := updateKey{ScreenId: update.ScreenId, LineId: update.LineId, UpdateType: update.UpdateType}
if umap[key] {
idsToDelete = append(idsToDelete, update.UpdateId)
continue
}
umap[key] = true
rtn = append(rtn, update)
}
if len(idsToDelete) > 0 {
err := sstore.RemoveScreenUpdates(ctx, idsToDelete)
if err != nil {
return nil, fmt.Errorf("error trying to delete screenupdates: %v\n", err)
}
}
return rtn, nil
}
func runWebShareUpdateWriter() {
defer func() {
setUpdateWriterRunning(false)
@ -501,32 +528,58 @@ func runWebShareUpdateWriter() {
numErrors := 0
numSendErrors := 0
for {
if numErrors > MaxUpdateWriterErrors {
log.Printf("[pcloud] update-writer, too many errors, exiting\n")
break
}
time.Sleep(100 * time.Millisecond)
updateArr, err := sstore.GetScreenUpdates(context.Background(), MaxUpdatesPerReq)
fullUpdateArr, err := sstore.GetScreenUpdates(context.Background(), MaxUpdatesToDeDup)
if err != nil {
log.Printf("[pcloud] error retrieving updates: %v", err)
time.Sleep(1 * time.Second)
numErrors++
if numErrors > MaxUpdateWriterErrors {
log.Printf("[pcloud] update-writer, too many read errors, exiting\n")
break
}
continue
}
if len(updateArr) == 0 {
sstore.UpdateWriterCheckMoreData()
updateArr, err := DeDupUpdates(context.Background(), fullUpdateArr)
if err != nil {
log.Printf("[pcloud] error deduping screenupdates: %v", err)
time.Sleep(1 * time.Second)
numErrors++
continue
}
numErrors = 0
webUpdates := convertUpdates(updateArr)
err = DoWebUpdates(webUpdates)
var webUpdateArr []*WebShareUpdateType
totalSize := 0
for _, update := range updateArr {
webUpdate := convertUpdate(update)
if webUpdate == nil {
continue
}
webUpdateArr = append(webUpdateArr, webUpdate)
totalSize += webUpdate.GetEstimatedSize()
if totalSize > MaxUpdatePayloadSize {
break
}
}
if len(webUpdateArr) == 0 {
sstore.UpdateWriterCheckMoreData()
continue
}
err = DoWebUpdates(webUpdateArr)
if err != nil {
numSendErrors++
backoffTime := computeBackoff(numSendErrors)
log.Printf("[pcloud] error processing web-updates (backoff=%v): %v\n", backoffTime, err)
log.Printf("[pcloud] error processing %d web-updates (backoff=%v): %v\n", len(webUpdateArr), backoffTime, err)
time.Sleep(backoffTime)
continue
}
log.Printf("[pcloud] sent %d web-updates\n", len(updateArr))
log.Printf("[pcloud] sent %d web-updates\n", len(webUpdateArr))
var debugStrs []string
for _, webUpdate := range webUpdateArr {
debugStrs = append(debugStrs, webUpdate.String())
}
log.Printf("[pcloud] updates: %s\n", strings.Join(debugStrs, " "))
numSendErrors = 0
}
}

View File

@ -2,6 +2,7 @@ package pcloud
import (
"context"
"encoding/json"
"fmt"
"github.com/scripthaus-dev/mshell/pkg/packet"
@ -40,6 +41,26 @@ type WebShareUpdateType struct {
TermOpts *sstore.TermOpts `json:"termopts,omitempty"`
}
const EstimatedSizePadding = 100
func (update *WebShareUpdateType) GetEstimatedSize() int {
barr, _ := json.Marshal(update)
return len(barr) + 100
}
func (update *WebShareUpdateType) String() string {
var idStr string
if update.LineId != "" && update.ScreenId != "" {
idStr = fmt.Sprintf("%s:%s", update.ScreenId[0:8], update.LineId[0:8])
} else if update.ScreenId != "" {
idStr = update.ScreenId[0:8]
}
if update.UpdateType == sstore.UpdateType_PtyPos && update.PtyData != nil {
return fmt.Sprintf("ptydata[%s][%d:%d]", idStr, update.PtyData.PtyPos, len(update.PtyData.Data))
}
return fmt.Sprintf("%s[%s]", update.UpdateType, idStr)
}
type WebShareUpdateResponseType struct {
UpdateId int64 `json:"updateid"`
Success bool `json:"success"`
@ -105,7 +126,6 @@ type WebShareLineType struct {
Renderer string `json:"renderer,omitempty"`
Text string `json:"text,omitempty"`
CmdId string `json:"cmdid,omitempty"`
Archived bool `json:"archived,omitempty"`
}
func webLineFromLine(line *sstore.LineType) (*WebShareLineType, error) {
@ -118,7 +138,6 @@ func webLineFromLine(line *sstore.LineType) (*WebShareLineType, error) {
Renderer: line.Renderer,
Text: line.Text,
CmdId: line.CmdId,
Archived: line.Archived,
}
return rtn, nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"strconv"
"strings"
"sync"
@ -23,7 +24,6 @@ const HistoryCols = "h.historyid, h.ts, h.userid, h.sessionid, h.screenid, h.lin
const DefaultMaxHistoryItems = 1000
var updateWriterCVar = sync.NewCond(&sync.Mutex{})
var updateWriterMoreData = false
var WebScreenPtyPosLock = &sync.Mutex{}
var WebScreenPtyPosDelIntent = make(map[string]bool) // map[screenid + ":" + lineid] -> bool
@ -57,18 +57,25 @@ func WithTx(ctx context.Context, fn func(tx *TxWrap) error) error {
}
func NotifyUpdateWriter() {
updateWriterCVar.L.Lock()
defer updateWriterCVar.L.Unlock()
updateWriterMoreData = true
updateWriterCVar.Signal()
// must happen in a goroutine to prevent deadlock.
// update-writer holds this lock while reading from the DB. we can't be holding the DB lock while calling this!
go func() {
updateWriterCVar.L.Lock()
defer updateWriterCVar.L.Unlock()
updateWriterCVar.Signal()
}()
}
func UpdateWriterCheckMoreData() {
updateWriterCVar.L.Lock()
defer updateWriterCVar.L.Unlock()
for {
if updateWriterMoreData {
updateWriterMoreData = false
updateCount, err := CountScreenUpdates(context.Background())
if err != nil {
log.Printf("ERROR getting screen update count (sleeping): %v", err)
// will just lead to a Wait()
}
if updateCount > 0 {
break
}
updateWriterCVar.Wait()
@ -2565,6 +2572,21 @@ func RemoveScreenUpdate(ctx context.Context, updateId int64) error {
})
}
func CountScreenUpdates(ctx context.Context) (int, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (int, error) {
query := `SELECT count(*) FROM screenupdate`
return tx.GetInt(query), nil
})
}
func RemoveScreenUpdates(ctx context.Context, updateIds []int64) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := `DELETE FROM screenupdate WHERE updateid IN (SELECT value FROM json_each(?))`
tx.Exec(query, quickJsonArr(updateIds))
return nil
})
}
func MaybeInsertPtyPosUpdate(ctx context.Context, screenId string, cmdId string) error {
return WithTx(ctx, func(tx *TxWrap) error {
if !isWebShare(tx, screenId) {