diff --git a/libs/common/src/tools/generator/state/buffered-key-definition.spec.ts b/libs/common/src/tools/generator/state/buffered-key-definition.spec.ts new file mode 100644 index 0000000000..b056cba397 --- /dev/null +++ b/libs/common/src/tools/generator/state/buffered-key-definition.spec.ts @@ -0,0 +1,119 @@ +import { GENERATOR_DISK, UserKeyDefinition } from "../../../platform/state"; + +import { BufferedKeyDefinition } from "./buffered-key-definition"; + +describe("BufferedKeyDefinition", () => { + const deserializer = (jsonValue: number) => jsonValue + 1; + + describe("toKeyDefinition", () => { + it("should create a key definition", () => { + const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { + deserializer, + cleanupDelayMs: 5, + clearOn: [], + }); + + const result = key.toKeyDefinition(); + + expect(result).toBeInstanceOf(UserKeyDefinition); + expect(result.stateDefinition).toBe(GENERATOR_DISK); + expect(result.key).toBe("test"); + expect(result.deserializer(1)).toEqual(2); + expect(result.cleanupDelayMs).toEqual(5); + }); + }); + + describe("shouldOverwrite", () => { + it("should call the shouldOverwrite function when its defined", async () => { + const shouldOverwrite = jest.fn(() => true); + const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { + deserializer, + shouldOverwrite, + clearOn: [], + }); + + const result = await key.shouldOverwrite(true); + + expect(shouldOverwrite).toHaveBeenCalledWith(true); + expect(result).toStrictEqual(true); + }); + + it("should return true when shouldOverwrite is not defined and the input is truthy", async () => { + const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { + deserializer, + clearOn: [], + }); + + const result = await key.shouldOverwrite(1); + + expect(result).toStrictEqual(true); + }); + + it("should return false when shouldOverwrite is not defined and the input is falsy", async () => { + const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { + deserializer, + clearOn: [], + }); + + const result = await key.shouldOverwrite(0); + + expect(result).toStrictEqual(false); + }); + }); + + describe("map", () => { + it("should call the map function when its defined", async () => { + const map = jest.fn((value: number) => Promise.resolve(`${value}`)); + const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { + deserializer, + map, + clearOn: [], + }); + + const result = await key.map(1, true); + + expect(map).toHaveBeenCalledWith(1, true); + expect(result).toStrictEqual("1"); + }); + + it("should fall back to an identity function when map is not defined", async () => { + const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { deserializer, clearOn: [] }); + + const result = await key.map(1, null); + + expect(result).toStrictEqual(1); + }); + }); + + describe("isValid", () => { + it("should call the isValid function when its defined", async () => { + const isValid = jest.fn(() => Promise.resolve(true)); + const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { + deserializer, + isValid, + clearOn: [], + }); + + const result = await key.isValid(1, true); + + expect(isValid).toHaveBeenCalledWith(1, true); + expect(result).toStrictEqual(true); + }); + + it("should return true when isValid is not defined and the input is truthy", async () => { + const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { deserializer, clearOn: [] }); + + const result = await key.isValid(1, null); + + expect(result).toStrictEqual(true); + }); + + it("should return false when isValid is not defined and the input is falsy", async () => { + const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { deserializer, clearOn: [] }); + + const result = await key.isValid(0, null); + + expect(result).toStrictEqual(false); + }); + }); +}); diff --git a/libs/common/src/tools/generator/state/buffered-key-definition.ts b/libs/common/src/tools/generator/state/buffered-key-definition.ts new file mode 100644 index 0000000000..5457410f80 --- /dev/null +++ b/libs/common/src/tools/generator/state/buffered-key-definition.ts @@ -0,0 +1,100 @@ +import { UserKeyDefinition, UserKeyDefinitionOptions } from "../../../platform/state"; +// eslint-disable-next-line -- `StateDefinition` used as an argument +import { StateDefinition } from "../../../platform/state/state-definition"; + +/** A set of options for customizing the behavior of a {@link BufferedKeyDefinition} + */ +export type BufferedKeyDefinitionOptions = + UserKeyDefinitionOptions & { + /** Checks whether the input type can be converted to the output type. + * @param input the data that is rolling over. + * @returns `true` if the definition is valid, otherwise `false`. If this + * function is not specified, any truthy input is valid. + * + * @remarks this is intended for cases where you're working with validated or + * signed data. It should be used to prevent data from being "laundered" through + * synchronized state. + */ + isValid?: (input: Input, dependency: Dependency) => Promise; + + /** Transforms the input data format to its output format. + * @param input the data that is rolling over. + * @returns the converted value. If this function is not specified, the value + * is asserted as the output type. + * + * @remarks This is intended for converting between, say, a replication format + * and a disk format or rotating encryption keys. + */ + map?: (input: Input, dependency: Dependency) => Promise; + + /** Checks whether an overwrite should occur + * @param dependency the latest value from the dependency observable provided + * to the buffered state. + * @returns `true` if a overwrite should occur, otherwise `false`. If this + * function is not specified, overwrites occur when the dependency is truthy. + * + * @remarks This is intended for waiting to overwrite until a dependency becomes + * available (e.g. an encryption key or a user confirmation). + */ + shouldOverwrite?: (dependency: Dependency) => boolean; + }; + +/** Storage and mapping settings for data stored by a `BufferedState`. + */ +export class BufferedKeyDefinition { + /** + * Defines a buffered state + * @param stateDefinition The domain of the buffer + * @param key Domain key that identifies the buffered value. This key must + * not be reused in any capacity. + * @param options Configures the operation of the buffered state. + */ + constructor( + readonly stateDefinition: StateDefinition, + readonly key: string, + readonly options: BufferedKeyDefinitionOptions, + ) {} + + /** Converts the buffered key definition to a state provider + * key definition + */ + toKeyDefinition() { + const bufferedKey = new UserKeyDefinition(this.stateDefinition, this.key, this.options); + + return bufferedKey; + } + + /** Checks whether the dependency triggers an overwrite. */ + shouldOverwrite(dependency: Dependency) { + const shouldOverwrite = this.options?.shouldOverwrite; + if (shouldOverwrite) { + return shouldOverwrite(dependency); + } + + return dependency ? true : false; + } + + /** Converts the input data format to its output format. + * @returns the converted value. + */ + map(input: Input, dependency: Dependency) { + const map = this.options?.map; + if (map) { + return map(input, dependency); + } + + return Promise.resolve(input as unknown as Output); + } + + /** Checks whether the input type can be converted to the output type. + * @returns `true` if the definition is valid, otherwise `false`. + */ + isValid(input: Input, dependency: Dependency) { + const isValid = this.options?.isValid; + if (isValid) { + return isValid(input, dependency); + } + + return Promise.resolve(input ? true : false); + } +} diff --git a/libs/common/src/tools/generator/state/buffered-state.spec.ts b/libs/common/src/tools/generator/state/buffered-state.spec.ts new file mode 100644 index 0000000000..7f9722d384 --- /dev/null +++ b/libs/common/src/tools/generator/state/buffered-state.spec.ts @@ -0,0 +1,375 @@ +import { BehaviorSubject, firstValueFrom, of } from "rxjs"; + +import { + mockAccountServiceWith, + FakeStateProvider, + awaitAsync, + trackEmissions, +} from "../../../../spec"; +import { GENERATOR_DISK, KeyDefinition } from "../../../platform/state"; +import { UserId } from "../../../types/guid"; + +import { BufferedKeyDefinition } from "./buffered-key-definition"; +import { BufferedState } from "./buffered-state"; + +const SomeUser = "SomeUser" as UserId; +const accountService = mockAccountServiceWith(SomeUser); +type SomeType = { foo: boolean; bar: boolean }; + +const SOME_KEY = new KeyDefinition(GENERATOR_DISK, "fooBar", { + deserializer: (jsonValue) => jsonValue as SomeType, +}); +const BUFFER_KEY = new BufferedKeyDefinition(GENERATOR_DISK, "fooBar_buffer", { + deserializer: (jsonValue) => jsonValue as SomeType, + clearOn: [], +}); + +describe("BufferedState", () => { + describe("state$", function () { + it("reads from the output state", async () => { + const provider = new FakeStateProvider(accountService); + const value = { foo: true, bar: false }; + const outputState = provider.getUser(SomeUser, SOME_KEY); + await outputState.update(() => value); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); + + const result = await firstValueFrom(bufferedState.state$); + + expect(result).toEqual(value); + }); + + it("updates when the output state updates", async () => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + const secondValue = { foo: true, bar: true }; + await outputState.update(() => firstValue); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); + + const result = trackEmissions(bufferedState.state$); + await outputState.update(() => secondValue); + await awaitAsync(); + + expect(result).toEqual([firstValue, secondValue]); + }); + + // this test is important for data migrations, which set + // the buffered state without using the `BufferedState` abstraction. + it.each([[null], [undefined]])( + "reads from the output state when the buffered state is '%p'", + async (bufferValue) => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + await outputState.update(() => firstValue); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); + await provider.setUserState(BUFFER_KEY.toKeyDefinition(), bufferValue, SomeUser); + + const result = await firstValueFrom(bufferedState.state$); + + expect(result).toEqual(firstValue); + }, + ); + + // also important for data migrations + it("rolls over pending values from the buffered state immediately by default", async () => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + await outputState.update(() => ({ foo: true, bar: false })); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); + const bufferedValue = { foo: true, bar: true }; + await provider.setUserState(BUFFER_KEY.toKeyDefinition(), bufferedValue, SomeUser); + + const result = await firstValueFrom(bufferedState.state$); + + expect(result).toEqual(bufferedValue); + }); + + // also important for data migrations + it("reads from the output state when its dependency is false", async () => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const value = { foo: true, bar: false }; + await outputState.update(() => value); + const dependency = new BehaviorSubject(false).asObservable(); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState, dependency); + await provider.setUserState(BUFFER_KEY.toKeyDefinition(), { foo: true, bar: true }, SomeUser); + + const result = await firstValueFrom(bufferedState.state$); + + expect(result).toEqual(value); + }); + + // also important for data migrations + it("overwrites the output state when its dependency emits a truthy value", async () => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + await outputState.update(() => firstValue); + const dependency = new BehaviorSubject(false); + const bufferedState = new BufferedState( + provider, + BUFFER_KEY, + outputState, + dependency.asObservable(), + ); + const bufferedValue = { foo: true, bar: true }; + await provider.setUserState(BUFFER_KEY.toKeyDefinition(), bufferedValue, SomeUser); + + const result = trackEmissions(bufferedState.state$); + dependency.next(true); + await awaitAsync(); + + expect(result).toEqual([firstValue, bufferedValue]); + }); + + it("overwrites the output state when shouldOverwrite returns a truthy value", async () => { + const bufferedKey = new BufferedKeyDefinition(GENERATOR_DISK, "fooBar_buffer", { + deserializer: (jsonValue) => jsonValue as SomeType, + shouldOverwrite: () => true, + clearOn: [], + }); + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + await outputState.update(() => ({ foo: true, bar: false })); + const bufferedState = new BufferedState(provider, bufferedKey, outputState); + const bufferedValue = { foo: true, bar: true }; + await provider.setUserState(bufferedKey.toKeyDefinition(), bufferedValue, SomeUser); + + const result = await firstValueFrom(bufferedState.state$); + + expect(result).toEqual(bufferedValue); + }); + + it("reads from the output state when shouldOverwrite returns a falsy value", async () => { + const bufferedKey = new BufferedKeyDefinition(GENERATOR_DISK, "fooBar_buffer", { + deserializer: (jsonValue) => jsonValue as SomeType, + shouldOverwrite: () => false, + clearOn: [], + }); + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const value = { foo: true, bar: false }; + await outputState.update(() => value); + const bufferedState = new BufferedState(provider, bufferedKey, outputState); + await provider.setUserState( + bufferedKey.toKeyDefinition(), + { foo: true, bar: true }, + SomeUser, + ); + + const result = await firstValueFrom(bufferedState.state$); + + expect(result).toEqual(value); + }); + + it("replaces the output state when shouldOverwrite transforms its dependency to a truthy value", async () => { + const bufferedKey = new BufferedKeyDefinition(GENERATOR_DISK, "fooBar_buffer", { + deserializer: (jsonValue) => jsonValue as SomeType, + shouldOverwrite: (dependency) => !dependency, + clearOn: [], + }); + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + await outputState.update(() => firstValue); + const dependency = new BehaviorSubject(true); + const bufferedState = new BufferedState( + provider, + bufferedKey, + outputState, + dependency.asObservable(), + ); + const bufferedValue = { foo: true, bar: true }; + await provider.setUserState(bufferedKey.toKeyDefinition(), bufferedValue, SomeUser); + + const result = trackEmissions(bufferedState.state$); + dependency.next(false); + await awaitAsync(); + + expect(result).toEqual([firstValue, bufferedValue]); + }); + }); + + describe("userId", () => { + const AnotherUser = "anotherUser" as UserId; + + it.each([[SomeUser], [AnotherUser]])("gets the userId", (userId) => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(userId, SOME_KEY); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); + + const result = bufferedState.userId; + + expect(result).toEqual(userId); + }); + }); + + describe("update", () => { + it("updates state$", async () => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + const secondValue = { foo: true, bar: true }; + await outputState.update(() => firstValue); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); + + const result = trackEmissions(bufferedState.state$); + await bufferedState.update(() => secondValue); + await awaitAsync(); + + expect(result).toEqual([firstValue, secondValue]); + }); + + it("respects update options", async () => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + const secondValue = { foo: true, bar: true }; + await outputState.update(() => firstValue); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); + + const result = trackEmissions(bufferedState.state$); + await bufferedState.update(() => secondValue, { + shouldUpdate: (_, latest) => latest, + combineLatestWith: of(false), + }); + await awaitAsync(); + + expect(result).toEqual([firstValue]); + }); + }); + + describe("buffer", () => { + it("updates state$ once per overwrite", async () => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + const secondValue = { foo: true, bar: true }; + await outputState.update(() => firstValue); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); + + const result = trackEmissions(bufferedState.state$); + await bufferedState.buffer(secondValue); + await awaitAsync(); + + expect(result).toEqual([firstValue, secondValue]); + }); + + it("emits the output state when its dependency is false", async () => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + await outputState.update(() => firstValue); + const dependency = new BehaviorSubject(false); + const bufferedState = new BufferedState( + provider, + BUFFER_KEY, + outputState, + dependency.asObservable(), + ); + const bufferedValue = { foo: true, bar: true }; + + const result = trackEmissions(bufferedState.state$); + await bufferedState.buffer(bufferedValue); + await awaitAsync(); + + expect(result).toEqual([firstValue, firstValue]); + }); + + it("replaces the output state when its dependency becomes true", async () => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + await outputState.update(() => firstValue); + const dependency = new BehaviorSubject(false); + const bufferedState = new BufferedState( + provider, + BUFFER_KEY, + outputState, + dependency.asObservable(), + ); + const bufferedValue = { foo: true, bar: true }; + + const result = trackEmissions(bufferedState.state$); + await bufferedState.buffer(bufferedValue); + dependency.next(true); + await awaitAsync(); + + expect(result).toEqual([firstValue, firstValue, bufferedValue]); + }); + + it.each([[null], [undefined]])("ignores `%p`", async (bufferedValue) => { + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + await outputState.update(() => firstValue); + const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); + + const result = trackEmissions(bufferedState.state$); + await bufferedState.buffer(bufferedValue); + await awaitAsync(); + + expect(result).toEqual([firstValue]); + }); + + it("discards the buffered data when isValid returns false", async () => { + const bufferedKey = new BufferedKeyDefinition(GENERATOR_DISK, "fooBar_buffer", { + deserializer: (jsonValue) => jsonValue as SomeType, + isValid: () => Promise.resolve(false), + clearOn: [], + }); + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + await outputState.update(() => firstValue); + const bufferedState = new BufferedState(provider, bufferedKey, outputState); + + const result = trackEmissions(bufferedState.state$); + await bufferedState.buffer({ foo: true, bar: true }); + await awaitAsync(); + + expect(result).toEqual([firstValue, firstValue]); + }); + + it("overwrites the output when isValid returns true", async () => { + const bufferedKey = new BufferedKeyDefinition(GENERATOR_DISK, "fooBar_buffer", { + deserializer: (jsonValue) => jsonValue as SomeType, + isValid: () => Promise.resolve(true), + clearOn: [], + }); + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + await outputState.update(() => firstValue); + const bufferedState = new BufferedState(provider, bufferedKey, outputState); + const bufferedValue = { foo: true, bar: true }; + + const result = trackEmissions(bufferedState.state$); + await bufferedState.buffer(bufferedValue); + await awaitAsync(); + + expect(result).toEqual([firstValue, bufferedValue]); + }); + + it("maps the buffered data when it overwrites the state", async () => { + const mappedValue = { foo: true, bar: true }; + const bufferedKey = new BufferedKeyDefinition(GENERATOR_DISK, "fooBar_buffer", { + deserializer: (jsonValue) => jsonValue as SomeType, + map: () => Promise.resolve(mappedValue), + clearOn: [], + }); + const provider = new FakeStateProvider(accountService); + const outputState = provider.getUser(SomeUser, SOME_KEY); + const firstValue = { foo: true, bar: false }; + await outputState.update(() => firstValue); + const bufferedState = new BufferedState(provider, bufferedKey, outputState); + + const result = trackEmissions(bufferedState.state$); + await bufferedState.buffer({ foo: false, bar: false }); + await awaitAsync(); + + expect(result).toEqual([firstValue, mappedValue]); + }); + }); +}); diff --git a/libs/common/src/tools/generator/state/buffered-state.ts b/libs/common/src/tools/generator/state/buffered-state.ts new file mode 100644 index 0000000000..42b14b815c --- /dev/null +++ b/libs/common/src/tools/generator/state/buffered-state.ts @@ -0,0 +1,144 @@ +import { Observable, combineLatest, concatMap, filter, map, of } from "rxjs"; + +import { + StateProvider, + SingleUserState, + CombinedState, + StateUpdateOptions, +} from "../../../platform/state"; + +import { BufferedKeyDefinition } from "./buffered-key-definition"; + +/** Stateful storage that overwrites one state with a buffered state. + * When a overwrite occurs, the input state is automatically deleted. + * @remarks The buffered state can only overwrite non-nullish values. If the + * buffer key contains `null` or `undefined`, it will do nothing. + */ +export class BufferedState implements SingleUserState { + /** + * Instantiate a buffered state + * @param provider constructs the buffer. + * @param key defines the buffer location. + * @param output updates when a overwrite occurs + * @param dependency$ provides data the buffer depends upon to evaluate and + * transform its data. If this is omitted, then `true` is injected as + * a dependency, which with a default output will trigger a overwrite immediately. + * + * @remarks `dependency$` enables overwrite control during dynamic circumstances, + * such as when a overwrite should occur only if a user key is available. + */ + constructor( + provider: StateProvider, + private key: BufferedKeyDefinition, + private output: SingleUserState, + dependency$: Observable = null, + ) { + this.bufferState = provider.getUser(output.userId, key.toKeyDefinition()); + + const watching = [ + this.bufferState.state$, + this.output.state$, + dependency$ ?? of(true as unknown as Dependency), + ] as const; + + this.state$ = combineLatest(watching).pipe( + concatMap(async ([input, output, dependency]) => { + const normalized = input ?? null; + + const canOverwrite = normalized !== null && key.shouldOverwrite(dependency); + if (canOverwrite) { + await this.updateOutput(dependency); + + // prevent duplicate updates by suppressing the update + return [false, output] as const; + } + + return [true, output] as const; + }), + filter(([updated]) => updated), + map(([, output]) => output), + ); + + this.combinedState$ = this.state$.pipe(map((state) => [this.output.userId, state])); + + this.bufferState$ = this.bufferState.state$; + } + + private bufferState: SingleUserState; + + private async updateOutput(dependency: Dependency) { + // retrieve the latest input value + let input: Input; + await this.bufferState.update((state) => state, { + shouldUpdate: (state) => { + input = state; + return false; + }, + }); + + // bail if this update lost the race with the last update + if (input === null) { + return; + } + + // destroy invalid data and bail + if (!(await this.key.isValid(input, dependency))) { + await this.bufferState.update(() => null); + return; + } + + // overwrite anything left to the output; the updates need to be awaited with `Promise.all` + // so that `inputState.update(() => null)` runs before `shouldUpdate` reads the value (above). + // This lets the emission from `this.outputState.update` renter the `concatMap`. If the + // awaits run in sequence, it can win the race and cause a double emission. + const output = await this.key.map(input, dependency); + await Promise.all([this.output.update(() => output), this.bufferState.update(() => null)]); + + return; + } + + /** {@link SingleUserState.userId} */ + get userId() { + return this.output.userId; + } + + /** Observes changes to the output state. This updates when the output + * state updates, when the buffer is moved to the output, and when `BufferedState.buffer` + * is invoked. + */ + readonly state$: Observable; + + /** {@link SingleUserState.combinedState$} */ + readonly combinedState$: Observable>; + + /** Buffers a value state. The buffered state overwrites the output + * state when a subscription occurs. + * @param value the state to roll over. Setting this to `null` or `undefined` + * has no effect. + */ + async buffer(value: Input): Promise { + const normalized = value ?? null; + if (normalized !== null) { + await this.bufferState.update(() => normalized); + } + } + + /** The data presently being buffered. This emits the pending value each time + * new buffer data is provided. It emits null when the buffer is empty. + */ + readonly bufferState$: Observable; + + /** Updates the output state. + * @param configureState a callback that returns an updated output + * state. The callback receives the state's present value as its + * first argument and the dependencies listed in `options.combinedLatestWith` + * as its second argument. + * @param options configures how the update is applied. See {@link StateUpdateOptions}. + */ + update( + configureState: (state: Output, dependencies: TCombine) => Output, + options: StateUpdateOptions = null, + ): Promise { + return this.output.update(configureState, options); + } +}