// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0

// wave pubsub system
package wps

import (
	"strings"
	"sync"

	"github.com/wavetermdev/waveterm/pkg/util/utilfn"
	"github.com/wavetermdev/waveterm/pkg/waveobj"
)

// this broker interface is mostly generic
// strong typing and event types can be defined elsewhere

const MaxPersist = 4096
const ReMakeArrThreshold = 10 * 1024

type Client interface {
	SendEvent(routeId string, event WaveEvent)
}

type BrokerSubscription struct {
	AllSubs   []string            // routeids subscribed to "all" events
	ScopeSubs map[string][]string // routeids subscribed to specific scopes
	StarSubs  map[string][]string // routeids subscribed to star scope (scopes with "*" or "**" in them)
}

type persistKey struct {
	Event string
	Scope string
}

type persistEventWrap struct {
	ArrTotalAdds int
	Events       []*WaveEvent
}

type BrokerType struct {
	Lock       *sync.Mutex
	Client     Client
	SubMap     map[string]*BrokerSubscription
	PersistMap map[persistKey]*persistEventWrap
}

var Broker = &BrokerType{
	Lock:       &sync.Mutex{},
	SubMap:     make(map[string]*BrokerSubscription),
	PersistMap: make(map[persistKey]*persistEventWrap),
}

func scopeHasStarMatch(scope string) bool {
	parts := strings.Split(scope, ":")
	for _, part := range parts {
		if part == "*" || part == "**" {
			return true
		}
	}
	return false
}

func (b *BrokerType) SetClient(client Client) {
	b.Lock.Lock()
	defer b.Lock.Unlock()
	b.Client = client
}

func (b *BrokerType) GetClient() Client {
	b.Lock.Lock()
	defer b.Lock.Unlock()
	return b.Client
}

// if already subscribed, this will *resubscribe* with the new subscription (remove the old one, and replace with this one)
func (b *BrokerType) Subscribe(subRouteId string, sub SubscriptionRequest) {
	// log.Printf("[wps] sub %s %s\n", subRouteId, sub.Event)
	if sub.Event == "" {
		return
	}
	b.Lock.Lock()
	defer b.Lock.Unlock()
	b.unsubscribe_nolock(subRouteId, sub.Event)
	bs := b.SubMap[sub.Event]
	if bs == nil {
		bs = &BrokerSubscription{
			AllSubs:   []string{},
			ScopeSubs: make(map[string][]string),
			StarSubs:  make(map[string][]string),
		}
		b.SubMap[sub.Event] = bs
	}
	if sub.AllScopes {
		bs.AllSubs = utilfn.AddElemToSliceUniq(bs.AllSubs, subRouteId)
		return
	}
	for _, scope := range sub.Scopes {
		starMatch := scopeHasStarMatch(scope)
		if starMatch {
			addStrToScopeMap(bs.StarSubs, scope, subRouteId)
		} else {
			addStrToScopeMap(bs.ScopeSubs, scope, subRouteId)
		}
	}
}

func (bs *BrokerSubscription) IsEmpty() bool {
	return len(bs.AllSubs) == 0 && len(bs.ScopeSubs) == 0 && len(bs.StarSubs) == 0
}

func removeStrFromScopeMap(scopeMap map[string][]string, scope string, routeId string) {
	scopeSubs := scopeMap[scope]
	scopeSubs = utilfn.RemoveElemFromSlice(scopeSubs, routeId)
	if len(scopeSubs) == 0 {
		delete(scopeMap, scope)
	} else {
		scopeMap[scope] = scopeSubs
	}
}

func removeStrFromScopeMapAll(scopeMap map[string][]string, routeId string) {
	for scope, scopeSubs := range scopeMap {
		scopeSubs = utilfn.RemoveElemFromSlice(scopeSubs, routeId)
		if len(scopeSubs) == 0 {
			delete(scopeMap, scope)
		} else {
			scopeMap[scope] = scopeSubs
		}
	}
}

func addStrToScopeMap(scopeMap map[string][]string, scope string, routeId string) {
	scopeSubs := scopeMap[scope]
	scopeSubs = utilfn.AddElemToSliceUniq(scopeSubs, routeId)
	scopeMap[scope] = scopeSubs
}

func (b *BrokerType) Unsubscribe(subRouteId string, eventName string) {
	// log.Printf("[wps] unsub %s %s\n", subRouteId, eventName)
	b.Lock.Lock()
	defer b.Lock.Unlock()
	b.unsubscribe_nolock(subRouteId, eventName)
}

func (b *BrokerType) unsubscribe_nolock(subRouteId string, eventName string) {
	bs := b.SubMap[eventName]
	if bs == nil {
		return
	}
	bs.AllSubs = utilfn.RemoveElemFromSlice(bs.AllSubs, subRouteId)
	for scope := range bs.ScopeSubs {
		removeStrFromScopeMap(bs.ScopeSubs, scope, subRouteId)
	}
	for scope := range bs.StarSubs {
		removeStrFromScopeMap(bs.StarSubs, scope, subRouteId)
	}
	if bs.IsEmpty() {
		delete(b.SubMap, eventName)
	}
}

func (b *BrokerType) UnsubscribeAll(subRouteId string) {
	b.Lock.Lock()
	defer b.Lock.Unlock()
	for eventType, bs := range b.SubMap {
		bs.AllSubs = utilfn.RemoveElemFromSlice(bs.AllSubs, subRouteId)
		removeStrFromScopeMapAll(bs.StarSubs, subRouteId)
		removeStrFromScopeMapAll(bs.ScopeSubs, subRouteId)
		if bs.IsEmpty() {
			delete(b.SubMap, eventType)
		}
	}
}

// does not take wildcards, use "" for all
func (b *BrokerType) ReadEventHistory(eventType string, scope string, maxItems int) []*WaveEvent {
	if maxItems <= 0 {
		return nil
	}
	b.Lock.Lock()
	defer b.Lock.Unlock()
	key := persistKey{Event: eventType, Scope: scope}
	pe := b.PersistMap[key]
	if pe == nil || len(pe.Events) == 0 {
		return nil
	}
	if maxItems > len(pe.Events) {
		maxItems = len(pe.Events)
	}
	// return new arr
	rtn := make([]*WaveEvent, maxItems)
	copy(rtn, pe.Events[len(pe.Events)-maxItems:])
	return rtn
}

func (b *BrokerType) persistEvent(event WaveEvent) {
	if event.Persist <= 0 {
		return
	}
	numPersist := event.Persist
	if numPersist > MaxPersist {
		numPersist = MaxPersist
	}
	scopeMap := make(map[string]bool)
	for _, scope := range event.Scopes {
		scopeMap[scope] = true
	}
	scopeMap[""] = true
	b.Lock.Lock()
	defer b.Lock.Unlock()
	for scope := range scopeMap {
		key := persistKey{Event: event.Event, Scope: scope}
		pe := b.PersistMap[key]
		if pe == nil {
			pe = &persistEventWrap{
				ArrTotalAdds: 0,
				Events:       make([]*WaveEvent, 0, event.Persist),
			}
			b.PersistMap[key] = pe
		}
		pe.Events = append(pe.Events, &event)
		pe.ArrTotalAdds++
		if pe.ArrTotalAdds > ReMakeArrThreshold {
			pe.Events = append([]*WaveEvent{}, pe.Events...)
			pe.ArrTotalAdds = len(pe.Events)
		}
	}
}

func (b *BrokerType) Publish(event WaveEvent) {
	// log.Printf("BrokerType.Publish: %v\n", event)
	if event.Persist > 0 {
		b.persistEvent(event)
	}
	client := b.GetClient()
	if client == nil {
		return
	}
	routeIds := b.getMatchingRouteIds(event)
	for _, routeId := range routeIds {
		client.SendEvent(routeId, event)
	}
}

func (b *BrokerType) SendUpdateEvents(updates waveobj.UpdatesRtnType) {
	for _, update := range updates {
		b.Publish(WaveEvent{
			Event:  Event_WaveObjUpdate,
			Scopes: []string{waveobj.MakeORef(update.OType, update.OID).String()},
			Data:   update,
		})
	}
}

func (b *BrokerType) getMatchingRouteIds(event WaveEvent) []string {
	b.Lock.Lock()
	defer b.Lock.Unlock()
	bs := b.SubMap[event.Event]
	if bs == nil {
		return nil
	}
	routeIds := make(map[string]bool)
	for _, routeId := range bs.AllSubs {
		routeIds[routeId] = true
	}
	for _, scope := range event.Scopes {
		for _, routeId := range bs.ScopeSubs[scope] {
			routeIds[routeId] = true
		}
		for starScope := range bs.StarSubs {
			if utilfn.StarMatchString(starScope, scope, ":") {
				for _, routeId := range bs.StarSubs[starScope] {
					routeIds[routeId] = true
				}
			}
		}
	}
	var rtn []string
	for routeId := range routeIds {
		rtn = append(rtn, routeId)
	}
	// log.Printf("getMatchingRouteIds %v %v\n", event, rtn)
	return rtn
}