[PM-7419] add buffered state (#8706)

Introduces a state manager that buffers data until an observed dependency signals it should 
overwrite another state manager with the buffered data. It can be used to implement 
migrations of encrypted data, edit-apply loops (such as used for save operations), and to 
map between encryption keys/formats.
This commit is contained in:
✨ Audrey ✨ 2024-04-12 13:31:58 -04:00 committed by GitHub
parent 8a71b50a5e
commit 44d59f0d8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 738 additions and 0 deletions

View File

@ -0,0 +1,119 @@
import { GENERATOR_DISK, UserKeyDefinition } from "../../../platform/state";
import { BufferedKeyDefinition } from "./buffered-key-definition";
describe("BufferedKeyDefinition", () => {
const deserializer = (jsonValue: number) => jsonValue + 1;
describe("toKeyDefinition", () => {
it("should create a key definition", () => {
const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", {
deserializer,
cleanupDelayMs: 5,
clearOn: [],
});
const result = key.toKeyDefinition();
expect(result).toBeInstanceOf(UserKeyDefinition);
expect(result.stateDefinition).toBe(GENERATOR_DISK);
expect(result.key).toBe("test");
expect(result.deserializer(1)).toEqual(2);
expect(result.cleanupDelayMs).toEqual(5);
});
});
describe("shouldOverwrite", () => {
it("should call the shouldOverwrite function when its defined", async () => {
const shouldOverwrite = jest.fn(() => true);
const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", {
deserializer,
shouldOverwrite,
clearOn: [],
});
const result = await key.shouldOverwrite(true);
expect(shouldOverwrite).toHaveBeenCalledWith(true);
expect(result).toStrictEqual(true);
});
it("should return true when shouldOverwrite is not defined and the input is truthy", async () => {
const key = new BufferedKeyDefinition<number, number, number>(GENERATOR_DISK, "test", {
deserializer,
clearOn: [],
});
const result = await key.shouldOverwrite(1);
expect(result).toStrictEqual(true);
});
it("should return false when shouldOverwrite is not defined and the input is falsy", async () => {
const key = new BufferedKeyDefinition<number, number, number>(GENERATOR_DISK, "test", {
deserializer,
clearOn: [],
});
const result = await key.shouldOverwrite(0);
expect(result).toStrictEqual(false);
});
});
describe("map", () => {
it("should call the map function when its defined", async () => {
const map = jest.fn((value: number) => Promise.resolve(`${value}`));
const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", {
deserializer,
map,
clearOn: [],
});
const result = await key.map(1, true);
expect(map).toHaveBeenCalledWith(1, true);
expect(result).toStrictEqual("1");
});
it("should fall back to an identity function when map is not defined", async () => {
const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { deserializer, clearOn: [] });
const result = await key.map(1, null);
expect(result).toStrictEqual(1);
});
});
describe("isValid", () => {
it("should call the isValid function when its defined", async () => {
const isValid = jest.fn(() => Promise.resolve(true));
const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", {
deserializer,
isValid,
clearOn: [],
});
const result = await key.isValid(1, true);
expect(isValid).toHaveBeenCalledWith(1, true);
expect(result).toStrictEqual(true);
});
it("should return true when isValid is not defined and the input is truthy", async () => {
const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { deserializer, clearOn: [] });
const result = await key.isValid(1, null);
expect(result).toStrictEqual(true);
});
it("should return false when isValid is not defined and the input is falsy", async () => {
const key = new BufferedKeyDefinition(GENERATOR_DISK, "test", { deserializer, clearOn: [] });
const result = await key.isValid(0, null);
expect(result).toStrictEqual(false);
});
});
});

View File

@ -0,0 +1,100 @@
import { UserKeyDefinition, UserKeyDefinitionOptions } from "../../../platform/state";
// eslint-disable-next-line -- `StateDefinition` used as an argument
import { StateDefinition } from "../../../platform/state/state-definition";
/** A set of options for customizing the behavior of a {@link BufferedKeyDefinition}
*/
export type BufferedKeyDefinitionOptions<Input, Output, Dependency> =
UserKeyDefinitionOptions<Input> & {
/** Checks whether the input type can be converted to the output type.
* @param input the data that is rolling over.
* @returns `true` if the definition is valid, otherwise `false`. If this
* function is not specified, any truthy input is valid.
*
* @remarks this is intended for cases where you're working with validated or
* signed data. It should be used to prevent data from being "laundered" through
* synchronized state.
*/
isValid?: (input: Input, dependency: Dependency) => Promise<boolean>;
/** Transforms the input data format to its output format.
* @param input the data that is rolling over.
* @returns the converted value. If this function is not specified, the value
* is asserted as the output type.
*
* @remarks This is intended for converting between, say, a replication format
* and a disk format or rotating encryption keys.
*/
map?: (input: Input, dependency: Dependency) => Promise<Output>;
/** Checks whether an overwrite should occur
* @param dependency the latest value from the dependency observable provided
* to the buffered state.
* @returns `true` if a overwrite should occur, otherwise `false`. If this
* function is not specified, overwrites occur when the dependency is truthy.
*
* @remarks This is intended for waiting to overwrite until a dependency becomes
* available (e.g. an encryption key or a user confirmation).
*/
shouldOverwrite?: (dependency: Dependency) => boolean;
};
/** Storage and mapping settings for data stored by a `BufferedState`.
*/
export class BufferedKeyDefinition<Input, Output = Input, Dependency = true> {
/**
* Defines a buffered state
* @param stateDefinition The domain of the buffer
* @param key Domain key that identifies the buffered value. This key must
* not be reused in any capacity.
* @param options Configures the operation of the buffered state.
*/
constructor(
readonly stateDefinition: StateDefinition,
readonly key: string,
readonly options: BufferedKeyDefinitionOptions<Input, Output, Dependency>,
) {}
/** Converts the buffered key definition to a state provider
* key definition
*/
toKeyDefinition() {
const bufferedKey = new UserKeyDefinition<Input>(this.stateDefinition, this.key, this.options);
return bufferedKey;
}
/** Checks whether the dependency triggers an overwrite. */
shouldOverwrite(dependency: Dependency) {
const shouldOverwrite = this.options?.shouldOverwrite;
if (shouldOverwrite) {
return shouldOverwrite(dependency);
}
return dependency ? true : false;
}
/** Converts the input data format to its output format.
* @returns the converted value.
*/
map(input: Input, dependency: Dependency) {
const map = this.options?.map;
if (map) {
return map(input, dependency);
}
return Promise.resolve(input as unknown as Output);
}
/** Checks whether the input type can be converted to the output type.
* @returns `true` if the definition is valid, otherwise `false`.
*/
isValid(input: Input, dependency: Dependency) {
const isValid = this.options?.isValid;
if (isValid) {
return isValid(input, dependency);
}
return Promise.resolve(input ? true : false);
}
}

View File

@ -0,0 +1,375 @@
import { BehaviorSubject, firstValueFrom, of } from "rxjs";
import {
mockAccountServiceWith,
FakeStateProvider,
awaitAsync,
trackEmissions,
} from "../../../../spec";
import { GENERATOR_DISK, KeyDefinition } from "../../../platform/state";
import { UserId } from "../../../types/guid";
import { BufferedKeyDefinition } from "./buffered-key-definition";
import { BufferedState } from "./buffered-state";
const SomeUser = "SomeUser" as UserId;
const accountService = mockAccountServiceWith(SomeUser);
type SomeType = { foo: boolean; bar: boolean };
const SOME_KEY = new KeyDefinition<SomeType>(GENERATOR_DISK, "fooBar", {
deserializer: (jsonValue) => jsonValue as SomeType,
});
const BUFFER_KEY = new BufferedKeyDefinition<SomeType>(GENERATOR_DISK, "fooBar_buffer", {
deserializer: (jsonValue) => jsonValue as SomeType,
clearOn: [],
});
describe("BufferedState", () => {
describe("state$", function () {
it("reads from the output state", async () => {
const provider = new FakeStateProvider(accountService);
const value = { foo: true, bar: false };
const outputState = provider.getUser(SomeUser, SOME_KEY);
await outputState.update(() => value);
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
const result = await firstValueFrom(bufferedState.state$);
expect(result).toEqual(value);
});
it("updates when the output state updates", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
const secondValue = { foo: true, bar: true };
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
const result = trackEmissions(bufferedState.state$);
await outputState.update(() => secondValue);
await awaitAsync();
expect(result).toEqual([firstValue, secondValue]);
});
// this test is important for data migrations, which set
// the buffered state without using the `BufferedState` abstraction.
it.each([[null], [undefined]])(
"reads from the output state when the buffered state is '%p'",
async (bufferValue) => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
await provider.setUserState(BUFFER_KEY.toKeyDefinition(), bufferValue, SomeUser);
const result = await firstValueFrom(bufferedState.state$);
expect(result).toEqual(firstValue);
},
);
// also important for data migrations
it("rolls over pending values from the buffered state immediately by default", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
await outputState.update(() => ({ foo: true, bar: false }));
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
const bufferedValue = { foo: true, bar: true };
await provider.setUserState(BUFFER_KEY.toKeyDefinition(), bufferedValue, SomeUser);
const result = await firstValueFrom(bufferedState.state$);
expect(result).toEqual(bufferedValue);
});
// also important for data migrations
it("reads from the output state when its dependency is false", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const value = { foo: true, bar: false };
await outputState.update(() => value);
const dependency = new BehaviorSubject<boolean>(false).asObservable();
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState, dependency);
await provider.setUserState(BUFFER_KEY.toKeyDefinition(), { foo: true, bar: true }, SomeUser);
const result = await firstValueFrom(bufferedState.state$);
expect(result).toEqual(value);
});
// also important for data migrations
it("overwrites the output state when its dependency emits a truthy value", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
await outputState.update(() => firstValue);
const dependency = new BehaviorSubject<boolean>(false);
const bufferedState = new BufferedState(
provider,
BUFFER_KEY,
outputState,
dependency.asObservable(),
);
const bufferedValue = { foo: true, bar: true };
await provider.setUserState(BUFFER_KEY.toKeyDefinition(), bufferedValue, SomeUser);
const result = trackEmissions(bufferedState.state$);
dependency.next(true);
await awaitAsync();
expect(result).toEqual([firstValue, bufferedValue]);
});
it("overwrites the output state when shouldOverwrite returns a truthy value", async () => {
const bufferedKey = new BufferedKeyDefinition<SomeType>(GENERATOR_DISK, "fooBar_buffer", {
deserializer: (jsonValue) => jsonValue as SomeType,
shouldOverwrite: () => true,
clearOn: [],
});
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
await outputState.update(() => ({ foo: true, bar: false }));
const bufferedState = new BufferedState(provider, bufferedKey, outputState);
const bufferedValue = { foo: true, bar: true };
await provider.setUserState(bufferedKey.toKeyDefinition(), bufferedValue, SomeUser);
const result = await firstValueFrom(bufferedState.state$);
expect(result).toEqual(bufferedValue);
});
it("reads from the output state when shouldOverwrite returns a falsy value", async () => {
const bufferedKey = new BufferedKeyDefinition<SomeType>(GENERATOR_DISK, "fooBar_buffer", {
deserializer: (jsonValue) => jsonValue as SomeType,
shouldOverwrite: () => false,
clearOn: [],
});
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const value = { foo: true, bar: false };
await outputState.update(() => value);
const bufferedState = new BufferedState(provider, bufferedKey, outputState);
await provider.setUserState(
bufferedKey.toKeyDefinition(),
{ foo: true, bar: true },
SomeUser,
);
const result = await firstValueFrom(bufferedState.state$);
expect(result).toEqual(value);
});
it("replaces the output state when shouldOverwrite transforms its dependency to a truthy value", async () => {
const bufferedKey = new BufferedKeyDefinition<SomeType>(GENERATOR_DISK, "fooBar_buffer", {
deserializer: (jsonValue) => jsonValue as SomeType,
shouldOverwrite: (dependency) => !dependency,
clearOn: [],
});
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
await outputState.update(() => firstValue);
const dependency = new BehaviorSubject<boolean>(true);
const bufferedState = new BufferedState(
provider,
bufferedKey,
outputState,
dependency.asObservable(),
);
const bufferedValue = { foo: true, bar: true };
await provider.setUserState(bufferedKey.toKeyDefinition(), bufferedValue, SomeUser);
const result = trackEmissions(bufferedState.state$);
dependency.next(false);
await awaitAsync();
expect(result).toEqual([firstValue, bufferedValue]);
});
});
describe("userId", () => {
const AnotherUser = "anotherUser" as UserId;
it.each([[SomeUser], [AnotherUser]])("gets the userId", (userId) => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(userId, SOME_KEY);
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
const result = bufferedState.userId;
expect(result).toEqual(userId);
});
});
describe("update", () => {
it("updates state$", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
const secondValue = { foo: true, bar: true };
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
const result = trackEmissions(bufferedState.state$);
await bufferedState.update(() => secondValue);
await awaitAsync();
expect(result).toEqual([firstValue, secondValue]);
});
it("respects update options", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
const secondValue = { foo: true, bar: true };
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
const result = trackEmissions(bufferedState.state$);
await bufferedState.update(() => secondValue, {
shouldUpdate: (_, latest) => latest,
combineLatestWith: of(false),
});
await awaitAsync();
expect(result).toEqual([firstValue]);
});
});
describe("buffer", () => {
it("updates state$ once per overwrite", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
const secondValue = { foo: true, bar: true };
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
const result = trackEmissions(bufferedState.state$);
await bufferedState.buffer(secondValue);
await awaitAsync();
expect(result).toEqual([firstValue, secondValue]);
});
it("emits the output state when its dependency is false", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
await outputState.update(() => firstValue);
const dependency = new BehaviorSubject<boolean>(false);
const bufferedState = new BufferedState(
provider,
BUFFER_KEY,
outputState,
dependency.asObservable(),
);
const bufferedValue = { foo: true, bar: true };
const result = trackEmissions(bufferedState.state$);
await bufferedState.buffer(bufferedValue);
await awaitAsync();
expect(result).toEqual([firstValue, firstValue]);
});
it("replaces the output state when its dependency becomes true", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
await outputState.update(() => firstValue);
const dependency = new BehaviorSubject<boolean>(false);
const bufferedState = new BufferedState(
provider,
BUFFER_KEY,
outputState,
dependency.asObservable(),
);
const bufferedValue = { foo: true, bar: true };
const result = trackEmissions(bufferedState.state$);
await bufferedState.buffer(bufferedValue);
dependency.next(true);
await awaitAsync();
expect(result).toEqual([firstValue, firstValue, bufferedValue]);
});
it.each([[null], [undefined]])("ignores `%p`", async (bufferedValue) => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
const result = trackEmissions(bufferedState.state$);
await bufferedState.buffer(bufferedValue);
await awaitAsync();
expect(result).toEqual([firstValue]);
});
it("discards the buffered data when isValid returns false", async () => {
const bufferedKey = new BufferedKeyDefinition<SomeType>(GENERATOR_DISK, "fooBar_buffer", {
deserializer: (jsonValue) => jsonValue as SomeType,
isValid: () => Promise.resolve(false),
clearOn: [],
});
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, bufferedKey, outputState);
const result = trackEmissions(bufferedState.state$);
await bufferedState.buffer({ foo: true, bar: true });
await awaitAsync();
expect(result).toEqual([firstValue, firstValue]);
});
it("overwrites the output when isValid returns true", async () => {
const bufferedKey = new BufferedKeyDefinition<SomeType>(GENERATOR_DISK, "fooBar_buffer", {
deserializer: (jsonValue) => jsonValue as SomeType,
isValid: () => Promise.resolve(true),
clearOn: [],
});
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, bufferedKey, outputState);
const bufferedValue = { foo: true, bar: true };
const result = trackEmissions(bufferedState.state$);
await bufferedState.buffer(bufferedValue);
await awaitAsync();
expect(result).toEqual([firstValue, bufferedValue]);
});
it("maps the buffered data when it overwrites the state", async () => {
const mappedValue = { foo: true, bar: true };
const bufferedKey = new BufferedKeyDefinition<SomeType>(GENERATOR_DISK, "fooBar_buffer", {
deserializer: (jsonValue) => jsonValue as SomeType,
map: () => Promise.resolve(mappedValue),
clearOn: [],
});
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
const firstValue = { foo: true, bar: false };
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, bufferedKey, outputState);
const result = trackEmissions(bufferedState.state$);
await bufferedState.buffer({ foo: false, bar: false });
await awaitAsync();
expect(result).toEqual([firstValue, mappedValue]);
});
});
});

View File

@ -0,0 +1,144 @@
import { Observable, combineLatest, concatMap, filter, map, of } from "rxjs";
import {
StateProvider,
SingleUserState,
CombinedState,
StateUpdateOptions,
} from "../../../platform/state";
import { BufferedKeyDefinition } from "./buffered-key-definition";
/** Stateful storage that overwrites one state with a buffered state.
* When a overwrite occurs, the input state is automatically deleted.
* @remarks The buffered state can only overwrite non-nullish values. If the
* buffer key contains `null` or `undefined`, it will do nothing.
*/
export class BufferedState<Input, Output, Dependency> implements SingleUserState<Output> {
/**
* Instantiate a buffered state
* @param provider constructs the buffer.
* @param key defines the buffer location.
* @param output updates when a overwrite occurs
* @param dependency$ provides data the buffer depends upon to evaluate and
* transform its data. If this is omitted, then `true` is injected as
* a dependency, which with a default output will trigger a overwrite immediately.
*
* @remarks `dependency$` enables overwrite control during dynamic circumstances,
* such as when a overwrite should occur only if a user key is available.
*/
constructor(
provider: StateProvider,
private key: BufferedKeyDefinition<Input, Output, Dependency>,
private output: SingleUserState<Output>,
dependency$: Observable<Dependency> = null,
) {
this.bufferState = provider.getUser(output.userId, key.toKeyDefinition());
const watching = [
this.bufferState.state$,
this.output.state$,
dependency$ ?? of(true as unknown as Dependency),
] as const;
this.state$ = combineLatest(watching).pipe(
concatMap(async ([input, output, dependency]) => {
const normalized = input ?? null;
const canOverwrite = normalized !== null && key.shouldOverwrite(dependency);
if (canOverwrite) {
await this.updateOutput(dependency);
// prevent duplicate updates by suppressing the update
return [false, output] as const;
}
return [true, output] as const;
}),
filter(([updated]) => updated),
map(([, output]) => output),
);
this.combinedState$ = this.state$.pipe(map((state) => [this.output.userId, state]));
this.bufferState$ = this.bufferState.state$;
}
private bufferState: SingleUserState<Input>;
private async updateOutput(dependency: Dependency) {
// retrieve the latest input value
let input: Input;
await this.bufferState.update((state) => state, {
shouldUpdate: (state) => {
input = state;
return false;
},
});
// bail if this update lost the race with the last update
if (input === null) {
return;
}
// destroy invalid data and bail
if (!(await this.key.isValid(input, dependency))) {
await this.bufferState.update(() => null);
return;
}
// overwrite anything left to the output; the updates need to be awaited with `Promise.all`
// so that `inputState.update(() => null)` runs before `shouldUpdate` reads the value (above).
// This lets the emission from `this.outputState.update` renter the `concatMap`. If the
// awaits run in sequence, it can win the race and cause a double emission.
const output = await this.key.map(input, dependency);
await Promise.all([this.output.update(() => output), this.bufferState.update(() => null)]);
return;
}
/** {@link SingleUserState.userId} */
get userId() {
return this.output.userId;
}
/** Observes changes to the output state. This updates when the output
* state updates, when the buffer is moved to the output, and when `BufferedState.buffer`
* is invoked.
*/
readonly state$: Observable<Output>;
/** {@link SingleUserState.combinedState$} */
readonly combinedState$: Observable<CombinedState<Output>>;
/** Buffers a value state. The buffered state overwrites the output
* state when a subscription occurs.
* @param value the state to roll over. Setting this to `null` or `undefined`
* has no effect.
*/
async buffer(value: Input): Promise<void> {
const normalized = value ?? null;
if (normalized !== null) {
await this.bufferState.update(() => normalized);
}
}
/** The data presently being buffered. This emits the pending value each time
* new buffer data is provided. It emits null when the buffer is empty.
*/
readonly bufferState$: Observable<Input>;
/** Updates the output state.
* @param configureState a callback that returns an updated output
* state. The callback receives the state's present value as its
* first argument and the dependencies listed in `options.combinedLatestWith`
* as its second argument.
* @param options configures how the update is applied. See {@link StateUpdateOptions}.
*/
update<TCombine>(
configureState: (state: Output, dependencies: TCombine) => Output,
options: StateUpdateOptions<Output, TCombine> = null,
): Promise<Output> {
return this.output.update(configureState, options);
}
}