From 38c335d8fbca60003d189ce8d14e542a9d48e093 Mon Sep 17 00:00:00 2001 From: Matt Gibson Date: Mon, 11 Dec 2023 20:32:39 -0500 Subject: [PATCH] Ps/avoid state emit until updated (#7124) * Add a small default time to limit timing failures * Handle subscription race conditions * Add Symbols to tracked emission types This is a bit of a cheat, but Symbols can't be cloned, so we need to nudge them to something we can handle. They are rare enough that anyone hitting this is likely to expect some special handling. * Ref count state listeners to minimize storage activity * Ensure statuses are updated * Remove notes * Use `test` when gramatically more proper * Copy race and subscription improvements to single user * Simplify observer initialization * Correct parameter names * Simplify update promises test we don't accidentally deadlock along the `getFromState` path * Fix save mock * WIP: most tests working * Avoid infinite update loop * Avoid potential deadlocks with awaiting assigned promises We were awaiting a promise assigned in a thenable. It turns out that assignment occurs before all thenables are concatenated, which can cause deadlocks. Likely, these were not showing up in tests because we're using very quick memory storage. * Fix update deadlock test * Add user update tests * Assert no double emit for multiple observers * Add use intent to method name * Ensure new subscriptions receive only newest data TODO: is this worth doing for active user state? * Remove unnecessary design requirement We don't need to await an executing update promise, we can support two emissions as long as the observable is guaranteed to get the new data. * Cleanup await spam * test cleanup option behavior * Remove unnecessary typecast * Throw over coerce for definition options --- libs/common/spec/fake-storage.service.ts | 2 +- libs/common/spec/utils.ts | 6 +- .../default-active-user-state.spec.ts | 351 ++++++++++++++++-- .../default-active-user-state.ts | 197 ++++++---- .../default-global-state.spec.ts | 208 ++++++++++- .../implementations/default-global-state.ts | 140 ++++--- .../default-single-user-state.spec.ts | 229 +++++++++++- .../default-single-user-state.ts | 145 +++++--- .../src/platform/state/key-definition.spec.ts | 31 ++ .../src/platform/state/key-definition.ts | 22 +- 10 files changed, 1126 insertions(+), 205 deletions(-) diff --git a/libs/common/spec/fake-storage.service.ts b/libs/common/spec/fake-storage.service.ts index ba3e461346..7c9e5b3231 100644 --- a/libs/common/spec/fake-storage.service.ts +++ b/libs/common/spec/fake-storage.service.ts @@ -59,7 +59,7 @@ export class FakeStorageService implements AbstractStorageService { return Promise.resolve(this.store[key] != null); } save(key: string, obj: T, options?: StorageOptions): Promise { - this.mock.save(key, options); + this.mock.save(key, obj, options); this.store[key] = obj; this.updatesSubject.next({ key: key, updateType: "save" }); return Promise.resolve(); diff --git a/libs/common/spec/utils.ts b/libs/common/spec/utils.ts index 5053a71c87..ad5907f61d 100644 --- a/libs/common/spec/utils.ts +++ b/libs/common/spec/utils.ts @@ -69,6 +69,10 @@ export function trackEmissions(observable: Observable): T[] { case "boolean": emissions.push(value); break; + case "symbol": + // Cheating types to make symbols work at all + emissions.push(value.toString() as T); + break; default: { emissions.push(clone(value)); } @@ -85,7 +89,7 @@ function clone(value: any): any { } } -export async function awaitAsync(ms = 0) { +export async function awaitAsync(ms = 1) { if (ms < 1) { await Promise.resolve(); } else { diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts index 065f7a8e95..e4a3f80eec 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts @@ -2,7 +2,7 @@ * need to update test environment so trackEmissions works appropriately * @jest-environment ../shared/test.environment.ts */ -import { any, mock } from "jest-mock-extended"; +import { any, anySymbol, mock } from "jest-mock-extended"; import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs"; import { Jsonify } from "type-fest"; @@ -11,7 +11,7 @@ import { FakeStorageService } from "../../../../spec/fake-storage.service"; import { AccountInfo, AccountService } from "../../../auth/abstractions/account.service"; import { AuthenticationStatus } from "../../../auth/enums/authentication-status"; import { UserId } from "../../../types/guid"; -import { KeyDefinition } from "../key-definition"; +import { KeyDefinition, userKeyBuilder } from "../key-definition"; import { StateDefinition } from "../state-definition"; import { DefaultActiveUserState } from "./default-active-user-state"; @@ -32,9 +32,10 @@ class TestState { } const testStateDefinition = new StateDefinition("fake", "disk"); - +const cleanupDelayMs = 10; const testKeyDefinition = new KeyDefinition(testStateDefinition, "fake", { deserializer: TestState.fromJSON, + cleanupDelayMs, }); describe("DefaultActiveUserState", () => { @@ -56,10 +57,14 @@ describe("DefaultActiveUserState", () => { ); }); + const makeUserId = (id: string) => { + return id != null ? (`00000000-0000-1000-a000-00000000000${id}` as UserId) : undefined; + }; + const changeActiveUser = async (id: string) => { - const userId = id != null ? `00000000-0000-1000-a000-00000000000${id}` : undefined; + const userId = makeUserId(id); activeAccountSubject.next({ - id: userId as UserId, + id: userId, email: `test${id}@example.com`, name: `Test User ${id}`, status: AuthenticationStatus.Unlocked, @@ -90,7 +95,7 @@ describe("DefaultActiveUserState", () => { const emissions = trackEmissions(userState.state$); // User signs in - changeActiveUser("1"); + await changeActiveUser("1"); await awaitAsync(); // Service does an update @@ -111,17 +116,17 @@ describe("DefaultActiveUserState", () => { expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 1, "user_00000000-0000-1000-a000-000000000001_fake_fake", - any(), + any(), // options ); expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 2, "user_00000000-0000-1000-a000-000000000001_fake_fake", - any(), + any(), // options ); expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 3, "user_00000000-0000-1000-a000-000000000002_fake_fake", - any(), + any(), // options ); // Should only have saved data for the first user @@ -129,7 +134,8 @@ describe("DefaultActiveUserState", () => { expect(diskStorageService.mock.save).toHaveBeenNthCalledWith( 1, "user_00000000-0000-1000-a000-000000000001_fake_fake", - any(), + updatedState, + any(), // options ); }); @@ -183,15 +189,17 @@ describe("DefaultActiveUserState", () => { }); it("should not emit a previous users value if that user is no longer active", async () => { + const user1Data: Jsonify = { + date: "2020-09-21T13:14:17.648Z", + array: ["value"], + }; + const user2Data: Jsonify = { + date: "2020-09-21T13:14:17.648Z", + array: [], + }; diskStorageService.internalUpdateStore({ - "user_00000000-0000-1000-a000-000000000001_fake_fake": { - date: "2020-09-21T13:14:17.648Z", - array: ["value"], - } as Jsonify, - "user_00000000-0000-1000-a000-000000000002_fake_fake": { - date: "2020-09-21T13:14:17.648Z", - array: [], - } as Jsonify, + "user_00000000-0000-1000-a000-000000000001_fake_fake": user1Data, + "user_00000000-0000-1000-a000-000000000002_fake_fake": user2Data, }); // This starts one subscription on the observable for tracking emissions throughout @@ -203,7 +211,7 @@ describe("DefaultActiveUserState", () => { // This should always return a value right await const value = await firstValueFrom(userState.state$); - expect(value).toBeTruthy(); + expect(value).toEqual(user1Data); // Make it such that there is no active user await changeActiveUser(undefined); @@ -222,20 +230,34 @@ describe("DefaultActiveUserState", () => { rejectedError = err; }); - expect(resolvedValue).toBeFalsy(); - expect(rejectedError).toBeTruthy(); + expect(resolvedValue).toBeUndefined(); + expect(rejectedError).not.toBeUndefined(); expect(rejectedError.message).toBe("Timeout has occurred"); // We need to figure out if something should be emitted // when there becomes no active user, if we don't want that to emit // this value is correct. - expect(emissions).toHaveLength(2); + expect(emissions).toEqual([user1Data]); + }); + + it("should not emit twice if there are two listeners", async () => { + await changeActiveUser("1"); + const emissions = trackEmissions(userState.state$); + const emissions2 = trackEmissions(userState.state$); + await awaitAsync(); + + expect(emissions).toEqual([ + null, // Initial value + ]); + expect(emissions2).toEqual([ + null, // Initial value + ]); }); describe("update", () => { const newData = { date: new Date(), array: ["test"] }; beforeEach(async () => { - changeActiveUser("1"); + await changeActiveUser("1"); }); it("should save on update", async () => { @@ -315,6 +337,8 @@ describe("DefaultActiveUserState", () => { return initialData; }); + await awaitAsync(); + await userState.update((state, dependencies) => { expect(state).toEqual(initialData); return newData; @@ -329,4 +353,285 @@ describe("DefaultActiveUserState", () => { ]); }); }); + + describe("update races", () => { + const newData = { date: new Date(), array: ["test"] }; + const userId = makeUserId("1"); + + beforeEach(async () => { + await changeActiveUser("1"); + await awaitAsync(); + }); + + test("subscriptions during an update should receive the current and latest", async () => { + const oldData = { date: new Date(2019, 1, 1), array: ["oldValue1"] }; + await userState.update(() => { + return oldData; + }); + const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] }; + await userState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => { + emissions2 = trackEmissions(userState.state$); + await originalSave(key, obj); + }); + + const val = await userState.update(() => { + return newData; + }); + + await awaitAsync(10); + + expect(val).toEqual(newData); + expect(emissions).toEqual([initialData, newData]); + expect(emissions2).toEqual([initialData, newData]); + }); + + test("subscription during an aborted update should receive the last value", async () => { + // Seed with interesting data + const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] }; + await userState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const val = await userState.update( + (state) => { + return newData; + }, + { + shouldUpdate: () => { + emissions2 = trackEmissions(userState.state$); + return false; + }, + }, + ); + + await awaitAsync(); + + expect(val).toEqual(initialData); + expect(emissions).toEqual([initialData]); + + expect(emissions2).toEqual([initialData]); + }); + + test("updates should wait until previous update is complete", async () => { + trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async (key: string, obj: any) => { + let resolved = false; + await Promise.race([ + userState.update(() => { + // deadlocks + resolved = true; + return newData; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(false); + }) + .mockImplementation((...args) => { + return originalSave(...args); + }); + + await userState.update(() => { + return newData; + }); + }); + + test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { + expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT + const val = await userState.update((state) => { + return newData; + }); + + expect(val).toEqual(newData); + const call = diskStorageService.mock.save.mock.calls[0]; + expect(call[0]).toEqual(`user_${userId}_fake_fake`); + expect(call[1]).toEqual(newData); + }); + + it("does not await updates if the active user changes", async () => { + const initialUserId = (await firstValueFrom(accountService.activeAccount$)).id; + expect(initialUserId).toBe(userId); + trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async (key: string, obj: any) => { + let resolved = false; + await changeActiveUser("2"); + await Promise.race([ + userState.update(() => { + // should not deadlock because we updated the user + resolved = true; + return newData; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(true); + }) + .mockImplementation((...args) => { + return originalSave(...args); + }); + + await userState.update(() => { + return newData; + }); + }); + + it("stores updates for users in the correct place when active user changes mid-update", async () => { + trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise + + const user2Data = { date: new Date(), array: ["user 2 data"] }; + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async (key: string, obj: any) => { + let resolved = false; + await changeActiveUser("2"); + await Promise.race([ + userState.update(() => { + // should not deadlock because we updated the user + resolved = true; + return user2Data; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(true); + await originalSave(key, obj); + }) + .mockImplementation((...args) => { + return originalSave(...args); + }); + + await userState.update(() => { + return newData; + }); + await awaitAsync(); + + expect(diskStorageService.mock.save).toHaveBeenCalledTimes(2); + const innerCall = diskStorageService.mock.save.mock.calls[0]; + expect(innerCall[0]).toEqual(`user_${makeUserId("2")}_fake_fake`); + expect(innerCall[1]).toEqual(user2Data); + const outerCall = diskStorageService.mock.save.mock.calls[1]; + expect(outerCall[0]).toEqual(`user_${makeUserId("1")}_fake_fake`); + expect(outerCall[1]).toEqual(newData); + }); + }); + + describe("cleanup", () => { + const newData = { date: new Date(), array: ["test"] }; + const userId = makeUserId("1"); + let userKey: string; + + beforeEach(async () => { + await changeActiveUser("1"); + userKey = userKeyBuilder(userId, testKeyDefinition); + }); + + async function assertClean() { + const emissions = trackEmissions(userState["stateSubject"]); + const initial = structuredClone(emissions); + + diskStorageService.save(userKey, newData); + await awaitAsync(); // storage updates are behind a promise + + expect(emissions).toEqual(initial); // no longer listening to storage updates + } + + it("should cleanup after last subscriber", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); // storage updates are behind a promise + + subscription.unsubscribe(); + expect(userState["subscriberCount"].getValue()).toBe(0); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("should not cleanup if there are still subscribers", async () => { + const subscription1 = userState.state$.subscribe(); + const sub2Emissions: TestState[] = []; + const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v)); + await awaitAsync(); // storage updates are behind a promise + + subscription1.unsubscribe(); + + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + expect(userState["subscriberCount"].getValue()).toBe(1); + + // Still be listening to storage updates + diskStorageService.save(userKey, newData); + await awaitAsync(); // storage updates are behind a promise + expect(sub2Emissions).toEqual([null, newData]); + + subscription2.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("can re-initialize after cleanup", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); + + subscription.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + + diskStorageService.save(userKey, newData); + await awaitAsync(); + + expect(emissions).toEqual([null, newData]); + }); + + it("should not cleanup if a subscriber joins during the cleanup delay", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); + + await diskStorageService.save(userKey, newData); + await awaitAsync(); + + subscription.unsubscribe(); + expect(userState["subscriberCount"].getValue()).toBe(0); + // Do not wait long enough for cleanup + await awaitAsync(cleanupDelayMs / 2); + + expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared + expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates + }); + }); }); diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.ts b/libs/common/src/platform/state/implementations/default-active-user-state.ts index 3d36af1d61..ae5c25dcb4 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.ts @@ -4,12 +4,12 @@ import { map, shareReplay, switchMap, - tap, - defer, firstValueFrom, combineLatestWith, filter, timeout, + Subscription, + tap, } from "rxjs"; import { AccountService } from "../../../auth/abstractions/account.service"; @@ -31,13 +31,21 @@ const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultActiveUserState implements ActiveUserState { [activeMarker]: true; private formattedKey$: Observable; + private updatePromise: Promise | null = null; + private storageUpdateSubscription: Subscription; + private activeAccountUpdateSubscription: Subscription; + private subscriberCount = new BehaviorSubject(0); + private stateObservable: Observable; protected stateSubject: BehaviorSubject = new BehaviorSubject< T | typeof FAKE_DEFAULT >(FAKE_DEFAULT); private stateSubject$ = this.stateSubject.asObservable(); - state$: Observable; + get state$() { + this.stateObservable = this.stateObservable ?? this.initializeObservable(); + return this.stateObservable; + } constructor( protected keyDefinition: KeyDefinition, @@ -51,62 +59,12 @@ export class DefaultActiveUserState implements ActiveUserState { ? userKeyBuilder(account.id, this.keyDefinition) : null, ), + tap(() => { + // We have a new key, so we should forget about previous update promises + this.updatePromise = null; + }), shareReplay({ bufferSize: 1, refCount: false }), ); - - const activeAccountData$ = this.formattedKey$.pipe( - switchMap(async (key) => { - if (key == null) { - return FAKE_DEFAULT; - } - return await getStoredValue( - key, - this.chosenStorageLocation, - this.keyDefinition.deserializer, - ); - }), - // Share the execution - shareReplay({ refCount: false, bufferSize: 1 }), - ); - - const storageUpdates$ = this.chosenStorageLocation.updates$.pipe( - combineLatestWith(this.formattedKey$), - filter(([update, key]) => key !== null && update.key === key), - switchMap(async ([update, key]) => { - if (update.updateType === "remove") { - return null; - } - const data = await getStoredValue( - key, - this.chosenStorageLocation, - this.keyDefinition.deserializer, - ); - return data; - }), - ); - - // Whomever subscribes to this data, should be notified of updated data - // if someone calls my update() method, or the active user changes. - this.state$ = defer(() => { - const accountChangeSubscription = activeAccountData$.subscribe((data) => { - this.stateSubject.next(data); - }); - const storageUpdateSubscription = storageUpdates$.subscribe((data) => { - this.stateSubject.next(data); - }); - - return this.stateSubject$.pipe( - tap({ - complete: () => { - accountChangeSubscription.unsubscribe(); - storageUpdateSubscription.unsubscribe(); - }, - }), - ); - }) - // I fake the generic here because I am filtering out the other union type - // and this makes it so that typescript understands the true type - .pipe(filter((value) => value != FAKE_DEFAULT)); } async update( @@ -114,8 +72,34 @@ export class DefaultActiveUserState implements ActiveUserState { options: StateUpdateOptions = {}, ): Promise { options = populateOptionsWithDefault(options); + try { + if (this.updatePromise != null) { + await this.updatePromise; + } + this.updatePromise = this.internalUpdate(configureState, options); + const newState = await this.updatePromise; + return newState; + } finally { + this.updatePromise = null; + } + } + + // TODO: this should be removed + async getFromState(): Promise { const key = await this.createKey(); - const currentState = await this.getGuaranteedState(key); + return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); + } + + createDerived(converter: Converter): DerivedUserState { + return new DefaultDerivedUserState(converter, this.encryptService, this); + } + + private async internalUpdate( + configureState: (state: T, dependency: TCombine) => T, + options: StateUpdateOptions, + ) { + const key = await this.createKey(); + const currentState = await this.getStateForUpdate(key); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -130,13 +114,53 @@ export class DefaultActiveUserState implements ActiveUserState { return newState; } - async getFromState(): Promise { - const key = await this.createKey(); - return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); - } + private initializeObservable() { + this.storageUpdateSubscription = this.chosenStorageLocation.updates$ + .pipe( + combineLatestWith(this.formattedKey$), + filter(([update, key]) => key !== null && update.key === key), + switchMap(async ([update, key]) => { + if (update.updateType === "remove") { + return null; + } + return await this.getState(key); + }), + ) + .subscribe((v) => this.stateSubject.next(v)); - createDerived(converter: Converter): DerivedUserState { - return new DefaultDerivedUserState(converter, this.encryptService, this); + this.activeAccountUpdateSubscription = this.formattedKey$ + .pipe( + switchMap(async (key) => { + if (key == null) { + return FAKE_DEFAULT; + } + return await this.getState(key); + }), + ) + .subscribe((v) => this.stateSubject.next(v)); + + this.subscriberCount.subscribe((count) => { + if (count === 0 && this.stateObservable != null) { + this.triggerCleanup(); + } + }); + + return new Observable((subscriber) => { + this.incrementSubscribers(); + + const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); + subscriber.unsubscribe = () => { + this.decrementSubscribers(); + prevUnsubscribe(); + }; + + return this.stateSubject + .pipe( + // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. + filter((i) => i !== FAKE_DEFAULT), + ) + .subscribe(subscriber); + }); } protected async createKey(): Promise { @@ -147,22 +171,47 @@ export class DefaultActiveUserState implements ActiveUserState { return formattedKey; } - protected async getGuaranteedState(key: string) { + /** For use in update methods, does not wait for update to complete before yielding state. + * The expectation is that that await is already done + */ + protected async getStateForUpdate(key: string) { const currentValue = this.stateSubject.getValue(); - return currentValue === FAKE_DEFAULT ? await this.seedInitial(key) : currentValue; + return currentValue === FAKE_DEFAULT + ? await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer) + : currentValue; } - private async seedInitial(key: string): Promise { - const value = await getStoredValue( - key, - this.chosenStorageLocation, - this.keyDefinition.deserializer, - ); - this.stateSubject.next(value); - return value; + /** To be used in observables. Awaits updates to ensure they are complete */ + private async getState(key: string): Promise { + if (this.updatePromise != null) { + await this.updatePromise; + } + return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); } protected saveToStorage(key: string, data: T): Promise { return this.chosenStorageLocation.save(key, data); } + + private incrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value + 1); + } + + private decrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value - 1); + } + + private triggerCleanup() { + setTimeout(() => { + if (this.subscriberCount.value === 0) { + this.updatePromise = null; + this.storageUpdateSubscription?.unsubscribe(); + this.activeAccountUpdateSubscription?.unsubscribe(); + this.stateObservable = null; + this.subscriberCount.complete(); + this.subscriberCount = new BehaviorSubject(0); + this.stateSubject.next(FAKE_DEFAULT); + } + }, this.keyDefinition.cleanupDelayMs); + } } diff --git a/libs/common/src/platform/state/implementations/default-global-state.spec.ts b/libs/common/src/platform/state/implementations/default-global-state.spec.ts index ae6cd1adbf..f9f95c652d 100644 --- a/libs/common/src/platform/state/implementations/default-global-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-global-state.spec.ts @@ -3,6 +3,7 @@ * @jest-environment ../shared/test.environment.ts */ +import { anySymbol } from "jest-mock-extended"; import { firstValueFrom, of } from "rxjs"; import { Jsonify } from "type-fest"; @@ -28,9 +29,10 @@ class TestState { } const testStateDefinition = new StateDefinition("fake", "disk"); - +const cleanupDelayMs = 10; const testKeyDefinition = new KeyDefinition(testStateDefinition, "fake", { deserializer: TestState.fromJSON, + cleanupDelayMs, }); const globalKey = globalKeyBuilder(testKeyDefinition); @@ -79,6 +81,19 @@ describe("DefaultGlobalState", () => { expect(diskStorageService.mock.get).toHaveBeenCalledWith("global_fake_fake", undefined); expect(state).toBeTruthy(); }); + + it("should not emit twice if there are two listeners", async () => { + const emissions = trackEmissions(globalState.state$); + const emissions2 = trackEmissions(globalState.state$); + await awaitAsync(); + + expect(emissions).toEqual([ + null, // Initial value + ]); + expect(emissions2).toEqual([ + null, // Initial value + ]); + }); }); describe("update", () => { @@ -133,6 +148,7 @@ describe("DefaultGlobalState", () => { it("should not update if shouldUpdate returns false", async () => { const emissions = trackEmissions(globalState.state$); + await awaitAsync(); // storage updates are behind a promise const result = await globalState.update( (state) => { @@ -198,4 +214,194 @@ describe("DefaultGlobalState", () => { expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); }); }); + + describe("update races", () => { + test("subscriptions during an update should receive the current and latest data", async () => { + const oldData = { date: new Date(2019, 1, 1) }; + await globalState.update(() => { + return oldData; + }); + const initialData = { date: new Date(2020, 1, 1) }; + await globalState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(globalState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => { + emissions2 = trackEmissions(globalState.state$); + await originalSave(key, obj); + }); + + const val = await globalState.update(() => { + return newData; + }); + + await awaitAsync(10); + + expect(val).toEqual(newData); + expect(emissions).toEqual([initialData, newData]); + expect(emissions2).toEqual([initialData, newData]); + }); + + test("subscription during an aborted update should receive the last value", async () => { + // Seed with interesting data + const initialData = { date: new Date(2020, 1, 1) }; + await globalState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(globalState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const val = await globalState.update( + () => { + return newData; + }, + { + shouldUpdate: () => { + emissions2 = trackEmissions(globalState.state$); + return false; + }, + }, + ); + + await awaitAsync(); + + expect(val).toEqual(initialData); + expect(emissions).toEqual([initialData]); + + expect(emissions2).toEqual([initialData]); + }); + + test("updates should wait until previous update is complete", async () => { + trackEmissions(globalState.state$); + await awaitAsync(); // storage updates are behind a promise + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async () => { + let resolved = false; + await Promise.race([ + globalState.update(() => { + // deadlocks + resolved = true; + return newData; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(false); + }) + .mockImplementation(originalSave); + + await globalState.update((state) => { + return newData; + }); + }); + + test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { + expect(globalState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT + const val = await globalState.update((state) => { + return newData; + }); + + expect(val).toEqual(newData); + const call = diskStorageService.mock.save.mock.calls[0]; + expect(call[0]).toEqual("global_fake_fake"); + expect(call[1]).toEqual(newData); + }); + }); + + describe("cleanup", () => { + async function assertClean() { + const emissions = trackEmissions(globalState["stateSubject"]); + const initial = structuredClone(emissions); + + diskStorageService.save(globalKey, newData); + await awaitAsync(); // storage updates are behind a promise + + expect(emissions).toEqual(initial); // no longer listening to storage updates + } + + it("should cleanup after last subscriber", async () => { + const subscription = globalState.state$.subscribe(); + await awaitAsync(); // storage updates are behind a promise + + subscription.unsubscribe(); + expect(globalState["subscriberCount"].getValue()).toBe(0); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("should not cleanup if there are still subscribers", async () => { + const subscription1 = globalState.state$.subscribe(); + const sub2Emissions: TestState[] = []; + const subscription2 = globalState.state$.subscribe((v) => sub2Emissions.push(v)); + await awaitAsync(); // storage updates are behind a promise + + subscription1.unsubscribe(); + + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + expect(globalState["subscriberCount"].getValue()).toBe(1); + + // Still be listening to storage updates + diskStorageService.save(globalKey, newData); + await awaitAsync(); // storage updates are behind a promise + expect(sub2Emissions).toEqual([null, newData]); + + subscription2.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("can re-initialize after cleanup", async () => { + const subscription = globalState.state$.subscribe(); + await awaitAsync(); + + subscription.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + const emissions = trackEmissions(globalState.state$); + await awaitAsync(); + + diskStorageService.save(globalKey, newData); + await awaitAsync(); + + expect(emissions).toEqual([null, newData]); + }); + + it("should not cleanup if a subscriber joins during the cleanup delay", async () => { + const subscription = globalState.state$.subscribe(); + await awaitAsync(); + + await diskStorageService.save(globalKey, newData); + await awaitAsync(); + + subscription.unsubscribe(); + expect(globalState["subscriberCount"].getValue()).toBe(0); + // Do not wait long enough for cleanup + await awaitAsync(cleanupDelayMs / 2); + + expect(globalState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared + expect(globalState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates + }); + }); }); diff --git a/libs/common/src/platform/state/implementations/default-global-state.ts b/libs/common/src/platform/state/implementations/default-global-state.ts index 8e08717f72..39430799a8 100644 --- a/libs/common/src/platform/state/implementations/default-global-state.ts +++ b/libs/common/src/platform/state/implementations/default-global-state.ts @@ -1,12 +1,10 @@ import { BehaviorSubject, Observable, - defer, + Subscription, filter, firstValueFrom, - shareReplay, switchMap, - tap, timeout, } from "rxjs"; @@ -23,54 +21,25 @@ const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultGlobalState implements GlobalState { private storageKey: string; + private updatePromise: Promise | null = null; + private storageUpdateSubscription: Subscription; + private subscriberCount = new BehaviorSubject(0); + private stateObservable: Observable; protected stateSubject: BehaviorSubject = new BehaviorSubject< T | typeof FAKE_DEFAULT >(FAKE_DEFAULT); - state$: Observable; + get state$() { + this.stateObservable = this.stateObservable ?? this.initializeObservable(); + return this.stateObservable; + } constructor( private keyDefinition: KeyDefinition, private chosenLocation: AbstractStorageService & ObservableStorageService, ) { this.storageKey = globalKeyBuilder(this.keyDefinition); - - const storageUpdates$ = this.chosenLocation.updates$.pipe( - filter((update) => update.key === this.storageKey), - switchMap(async (update) => { - if (update.updateType === "remove") { - return null; - } - return await getStoredValue( - this.storageKey, - this.chosenLocation, - this.keyDefinition.deserializer, - ); - }), - shareReplay({ bufferSize: 1, refCount: false }), - ); - - this.state$ = defer(() => { - const storageUpdateSubscription = storageUpdates$.subscribe((value) => { - this.stateSubject.next(value); - }); - - this.getFromState().then((s) => { - this.stateSubject.next(s); - }); - - return this.stateSubject.pipe( - tap({ - complete: () => { - storageUpdateSubscription.unsubscribe(); - }, - }), - ); - }).pipe( - shareReplay({ refCount: false, bufferSize: 1 }), - filter((i) => i != FAKE_DEFAULT), - ); } async update( @@ -78,7 +47,24 @@ export class DefaultGlobalState implements GlobalState { options: StateUpdateOptions = {}, ): Promise { options = populateOptionsWithDefault(options); - const currentState = await this.getGuaranteedState(); + if (this.updatePromise != null) { + await this.updatePromise; + } + + try { + this.updatePromise = this.internalUpdate(configureState, options); + const newState = await this.updatePromise; + return newState; + } finally { + this.updatePromise = null; + } + } + + private async internalUpdate( + configureState: (state: T, dependency: TCombine) => T, + options: StateUpdateOptions, + ): Promise { + const currentState = await this.getStateForUpdate(); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -93,16 +79,86 @@ export class DefaultGlobalState implements GlobalState { return newState; } - private async getGuaranteedState() { + private initializeObservable() { + this.storageUpdateSubscription = this.chosenLocation.updates$ + .pipe( + filter((update) => update.key === this.storageKey), + switchMap(async (update) => { + if (update.updateType === "remove") { + return null; + } + return await this.getFromState(); + }), + ) + .subscribe((v) => this.stateSubject.next(v)); + + this.subscriberCount.subscribe((count) => { + if (count === 0 && this.stateObservable != null) { + this.triggerCleanup(); + } + }); + + // Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to + // trigger populating it immediately. + this.getFromState().then((s) => { + this.stateSubject.next(s); + }); + + return new Observable((subscriber) => { + this.incrementSubscribers(); + + const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); + subscriber.unsubscribe = () => { + this.decrementSubscribers(); + prevUnsubscribe(); + }; + + return this.stateSubject + .pipe( + // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. + filter((i) => i != FAKE_DEFAULT), + ) + .subscribe(subscriber); + }); + } + + /** For use in update methods, does not wait for update to complete before yielding state. + * The expectation is that that await is already done + */ + private async getStateForUpdate() { const currentValue = this.stateSubject.getValue(); return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; } async getFromState(): Promise { + if (this.updatePromise != null) { + return await this.updatePromise; + } return await getStoredValue( this.storageKey, this.chosenLocation, this.keyDefinition.deserializer, ); } + + private incrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value + 1); + } + + private decrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value - 1); + } + + private triggerCleanup() { + setTimeout(() => { + if (this.subscriberCount.value === 0) { + this.updatePromise = null; + this.storageUpdateSubscription.unsubscribe(); + this.stateObservable = null; + this.subscriberCount.complete(); + this.subscriberCount = new BehaviorSubject(0); + this.stateSubject.next(FAKE_DEFAULT); + } + }, this.keyDefinition.cleanupDelayMs); + } } diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts b/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts index a25ee863e6..1c24c5f48c 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts @@ -3,6 +3,7 @@ * @jest-environment ../shared/test.environment.ts */ +import { anySymbol } from "jest-mock-extended"; import { firstValueFrom, of } from "rxjs"; import { Jsonify } from "type-fest"; @@ -30,21 +31,22 @@ class TestState { } const testStateDefinition = new StateDefinition("fake", "disk"); - +const cleanupDelayMs = 10; const testKeyDefinition = new KeyDefinition(testStateDefinition, "fake", { deserializer: TestState.fromJSON, + cleanupDelayMs, }); const userId = Utils.newGuid() as UserId; const userKey = userKeyBuilder(userId, testKeyDefinition); describe("DefaultSingleUserState", () => { let diskStorageService: FakeStorageService; - let globalState: DefaultSingleUserState; + let userState: DefaultSingleUserState; const newData = { date: new Date() }; beforeEach(() => { diskStorageService = new FakeStorageService(); - globalState = new DefaultSingleUserState( + userState = new DefaultSingleUserState( userId, testKeyDefinition, null, // Not testing anything with encrypt service @@ -58,7 +60,7 @@ describe("DefaultSingleUserState", () => { describe("state$", () => { it("should emit when storage updates", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await diskStorageService.save(userKey, newData); await awaitAsync(); @@ -69,7 +71,7 @@ describe("DefaultSingleUserState", () => { }); it("should not emit when update key does not match", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await diskStorageService.save("wrong_key", newData); expect(emissions).toHaveLength(0); @@ -82,7 +84,7 @@ describe("DefaultSingleUserState", () => { }); diskStorageService.internalUpdateStore(initialStorage); - const state = await firstValueFrom(globalState.state$); + const state = await firstValueFrom(userState.state$); expect(diskStorageService.mock.get).toHaveBeenCalledTimes(1); expect(diskStorageService.mock.get).toHaveBeenCalledWith( `user_${userId}_fake_fake`, @@ -94,7 +96,7 @@ describe("DefaultSingleUserState", () => { describe("update", () => { it("should save on update", async () => { - const result = await globalState.update((state) => { + const result = await userState.update((state) => { return newData; }); @@ -103,10 +105,10 @@ describe("DefaultSingleUserState", () => { }); it("should emit once per update", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await awaitAsync(); // storage updates are behind a promise - await globalState.update((state) => { + await userState.update((state) => { return newData; }); @@ -119,12 +121,12 @@ describe("DefaultSingleUserState", () => { }); it("should provided combined dependencies", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await awaitAsync(); // storage updates are behind a promise const combinedDependencies = { date: new Date() }; - await globalState.update( + await userState.update( (state, dependencies) => { expect(dependencies).toEqual(combinedDependencies); return newData; @@ -143,9 +145,10 @@ describe("DefaultSingleUserState", () => { }); it("should not update if shouldUpdate returns false", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise - const result = await globalState.update( + const result = await userState.update( (state) => { return newData; }, @@ -160,18 +163,18 @@ describe("DefaultSingleUserState", () => { }); it("should provide the update callback with the current State", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await awaitAsync(); // storage updates are behind a promise // Seed with interesting data const initialData = { date: new Date(2020, 1, 1) }; - await globalState.update((state, dependencies) => { + await userState.update((state, dependencies) => { return initialData; }); await awaitAsync(); - await globalState.update((state) => { + await userState.update((state) => { expect(state).toEqual(initialData); return newData; }); @@ -193,14 +196,14 @@ describe("DefaultSingleUserState", () => { initialStorage[userKey] = initialState; diskStorageService.internalUpdateStore(initialStorage); - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await awaitAsync(); // storage updates are behind a promise const newState = { ...initialState, date: new Date(initialState.date.getFullYear(), initialState.date.getMonth() + 1), }; - const actual = await globalState.update((existingState) => newState); + const actual = await userState.update((existingState) => newState); await awaitAsync(); @@ -209,4 +212,194 @@ describe("DefaultSingleUserState", () => { expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); }); }); + + describe("update races", () => { + test("subscriptions during an update should receive the current and latest data", async () => { + const oldData = { date: new Date(2019, 1, 1) }; + await userState.update(() => { + return oldData; + }); + const initialData = { date: new Date(2020, 1, 1) }; + await userState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => { + emissions2 = trackEmissions(userState.state$); + await originalSave(key, obj); + }); + + const val = await userState.update(() => { + return newData; + }); + + await awaitAsync(10); + + expect(val).toEqual(newData); + expect(emissions).toEqual([initialData, newData]); + expect(emissions2).toEqual([initialData, newData]); + }); + + test("subscription during an aborted update should receive the last value", async () => { + // Seed with interesting data + const initialData = { date: new Date(2020, 1, 1) }; + await userState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const val = await userState.update( + (state) => { + return newData; + }, + { + shouldUpdate: () => { + emissions2 = trackEmissions(userState.state$); + return false; + }, + }, + ); + + await awaitAsync(); + + expect(val).toEqual(initialData); + expect(emissions).toEqual([initialData]); + + expect(emissions2).toEqual([initialData]); + }); + + test("updates should wait until previous update is complete", async () => { + trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async () => { + let resolved = false; + await Promise.race([ + userState.update(() => { + // deadlocks + resolved = true; + return newData; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(false); + }) + .mockImplementation(originalSave); + + await userState.update((state) => { + return newData; + }); + }); + + test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { + expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT + const val = await userState.update((state) => { + return newData; + }); + + expect(val).toEqual(newData); + const call = diskStorageService.mock.save.mock.calls[0]; + expect(call[0]).toEqual(`user_${userId}_fake_fake`); + expect(call[1]).toEqual(newData); + }); + }); + + describe("cleanup", () => { + async function assertClean() { + const emissions = trackEmissions(userState["stateSubject"]); + const initial = structuredClone(emissions); + + diskStorageService.save(userKey, newData); + await awaitAsync(); // storage updates are behind a promise + + expect(emissions).toEqual(initial); // no longer listening to storage updates + } + + it("should cleanup after last subscriber", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); // storage updates are behind a promise + + subscription.unsubscribe(); + expect(userState["subscriberCount"].getValue()).toBe(0); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("should not cleanup if there are still subscribers", async () => { + const subscription1 = userState.state$.subscribe(); + const sub2Emissions: TestState[] = []; + const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v)); + await awaitAsync(); // storage updates are behind a promise + + subscription1.unsubscribe(); + + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + expect(userState["subscriberCount"].getValue()).toBe(1); + + // Still be listening to storage updates + diskStorageService.save(userKey, newData); + await awaitAsync(); // storage updates are behind a promise + expect(sub2Emissions).toEqual([null, newData]); + + subscription2.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("can re-initialize after cleanup", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); + + subscription.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + + diskStorageService.save(userKey, newData); + await awaitAsync(); + + expect(emissions).toEqual([null, newData]); + }); + + it("should not cleanup if a subscriber joins during the cleanup delay", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); + + await diskStorageService.save(userKey, newData); + await awaitAsync(); + + subscription.unsubscribe(); + expect(userState["subscriberCount"].getValue()).toBe(0); + // Do not wait long enough for cleanup + await awaitAsync(cleanupDelayMs / 2); + + expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared + expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates + }); + }); }); diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.ts b/libs/common/src/platform/state/implementations/default-single-user-state.ts index 46fa00ffb3..0e9cacae51 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.ts @@ -1,12 +1,10 @@ import { BehaviorSubject, Observable, - defer, + Subscription, filter, firstValueFrom, - shareReplay, switchMap, - tap, timeout, } from "rxjs"; @@ -23,16 +21,24 @@ import { Converter, SingleUserState } from "../user-state"; import { DefaultDerivedUserState } from "./default-derived-state"; import { getStoredValue } from "./util"; + const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultSingleUserState implements SingleUserState { private storageKey: string; + private updatePromise: Promise | null = null; + private storageUpdateSubscription: Subscription; + private subscriberCount = new BehaviorSubject(0); + private stateObservable: Observable; protected stateSubject: BehaviorSubject = new BehaviorSubject< T | typeof FAKE_DEFAULT >(FAKE_DEFAULT); - state$: Observable; + get state$() { + this.stateObservable = this.stateObservable ?? this.initializeObservable(); + return this.stateObservable; + } constructor( readonly userId: UserId, @@ -41,42 +47,6 @@ export class DefaultSingleUserState implements SingleUserState { private chosenLocation: AbstractStorageService & ObservableStorageService, ) { this.storageKey = userKeyBuilder(this.userId, this.keyDefinition); - - const storageUpdates$ = this.chosenLocation.updates$.pipe( - filter((update) => update.key === this.storageKey), - switchMap(async (update) => { - if (update.updateType === "remove") { - return null; - } - return await getStoredValue( - this.storageKey, - this.chosenLocation, - this.keyDefinition.deserializer, - ); - }), - shareReplay({ bufferSize: 1, refCount: false }), - ); - - this.state$ = defer(() => { - const storageUpdateSubscription = storageUpdates$.subscribe((value) => { - this.stateSubject.next(value); - }); - - this.getFromState().then((s) => { - this.stateSubject.next(s); - }); - - return this.stateSubject.pipe( - tap({ - complete: () => { - storageUpdateSubscription.unsubscribe(); - }, - }), - ); - }).pipe( - shareReplay({ refCount: false, bufferSize: 1 }), - filter((i) => i != FAKE_DEFAULT), - ); } async update( @@ -84,7 +54,28 @@ export class DefaultSingleUserState implements SingleUserState { options: StateUpdateOptions = {}, ): Promise { options = populateOptionsWithDefault(options); - const currentState = await this.getGuaranteedState(); + if (this.updatePromise != null) { + await this.updatePromise; + } + + try { + this.updatePromise = this.internalUpdate(configureState, options); + const newState = await this.updatePromise; + return newState; + } finally { + this.updatePromise = null; + } + } + + createDerived(converter: Converter): DerivedUserState { + return new DefaultDerivedUserState(converter, this.encryptService, this); + } + + private async internalUpdate( + configureState: (state: T, dependency: TCombine) => T, + options: StateUpdateOptions, + ): Promise { + const currentState = await this.getStateForUpdate(); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -99,20 +90,86 @@ export class DefaultSingleUserState implements SingleUserState { return newState; } - createDerived(converter: Converter): DerivedUserState { - return new DefaultDerivedUserState(converter, this.encryptService, this); + private initializeObservable() { + this.storageUpdateSubscription = this.chosenLocation.updates$ + .pipe( + filter((update) => update.key === this.storageKey), + switchMap(async (update) => { + if (update.updateType === "remove") { + return null; + } + return await this.getFromState(); + }), + ) + .subscribe((v) => this.stateSubject.next(v)); + + this.subscriberCount.subscribe((count) => { + if (count === 0 && this.stateObservable != null) { + this.triggerCleanup(); + } + }); + + // Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to + // trigger populating it immediately. + this.getFromState().then((s) => { + this.stateSubject.next(s); + }); + + return new Observable((subscriber) => { + this.incrementSubscribers(); + + const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); + subscriber.unsubscribe = () => { + this.decrementSubscribers(); + prevUnsubscribe(); + }; + + return this.stateSubject + .pipe( + // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. + filter((i) => i != FAKE_DEFAULT), + ) + .subscribe(subscriber); + }); } - private async getGuaranteedState() { + /** For use in update methods, does not wait for update to complete before yielding state. + * The expectation is that that await is already done + */ + private async getStateForUpdate() { const currentValue = this.stateSubject.getValue(); return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; } async getFromState(): Promise { + if (this.updatePromise != null) { + return await this.updatePromise; + } return await getStoredValue( this.storageKey, this.chosenLocation, this.keyDefinition.deserializer, ); } + + private incrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value + 1); + } + + private decrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value - 1); + } + + private triggerCleanup() { + setTimeout(() => { + if (this.subscriberCount.value === 0) { + this.updatePromise = null; + this.storageUpdateSubscription.unsubscribe(); + this.stateObservable = null; + this.subscriberCount.complete(); + this.subscriberCount = new BehaviorSubject(0); + this.stateSubject.next(FAKE_DEFAULT); + } + }, this.keyDefinition.cleanupDelayMs); + } } diff --git a/libs/common/src/platform/state/key-definition.spec.ts b/libs/common/src/platform/state/key-definition.spec.ts index cbb1e49a9a..ee926bccd8 100644 --- a/libs/common/src/platform/state/key-definition.spec.ts +++ b/libs/common/src/platform/state/key-definition.spec.ts @@ -18,6 +18,37 @@ describe("KeyDefinition", () => { }); }); + describe("cleanupDelayMs", () => { + it("defaults to 1000ms", () => { + const keyDefinition = new KeyDefinition(fakeStateDefinition, "fake", { + deserializer: (value) => value, + }); + + expect(keyDefinition).toBeTruthy(); + expect(keyDefinition.cleanupDelayMs).toBe(1000); + }); + + it("can be overridden", () => { + const keyDefinition = new KeyDefinition(fakeStateDefinition, "fake", { + deserializer: (value) => value, + cleanupDelayMs: 500, + }); + + expect(keyDefinition).toBeTruthy(); + expect(keyDefinition.cleanupDelayMs).toBe(500); + }); + + it.each([0, -1])("throws on 0 or negative (%s)", (testValue: number) => { + expect( + () => + new KeyDefinition(fakeStateDefinition, "fake", { + deserializer: (value) => value, + cleanupDelayMs: testValue, + }), + ).toThrow(); + }); + }); + describe("record", () => { it("runs custom deserializer for each record value", () => { const recordDefinition = KeyDefinition.record(fakeStateDefinition, "fake", { diff --git a/libs/common/src/platform/state/key-definition.ts b/libs/common/src/platform/state/key-definition.ts index db65740388..9989bf37a2 100644 --- a/libs/common/src/platform/state/key-definition.ts +++ b/libs/common/src/platform/state/key-definition.ts @@ -19,6 +19,11 @@ type KeyDefinitionOptions = { * @returns The fully typed version of your state. */ readonly deserializer: (jsonValue: Jsonify) => T; + /** + * The number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed. + * Defaults to 1000ms. + */ + readonly cleanupDelayMs?: number; }; /** @@ -42,8 +47,12 @@ export class KeyDefinition { private readonly options: KeyDefinitionOptions, ) { if (options.deserializer == null) { + throw new Error(`'deserializer' is a required property on key ${this.errorKeyName}`); + } + + if (options.cleanupDelayMs <= 0) { throw new Error( - `'deserializer' is a required property on key ${stateDefinition.name} > ${key}`, + `'cleanupDelayMs' must be greater than 0. Value of ${options.cleanupDelayMs} passed to key ${this.errorKeyName} `, ); } } @@ -55,6 +64,13 @@ export class KeyDefinition { return this.options.deserializer; } + /** + * Gets the number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed. + */ + get cleanupDelayMs() { + return this.options.cleanupDelayMs < 0 ? 0 : this.options.cleanupDelayMs ?? 1000; + } + /** * Creates a {@link KeyDefinition} for state that is an array. * @param stateDefinition The state definition to be added to the KeyDefinition @@ -137,6 +153,10 @@ export class KeyDefinition { ? `${scope}_${userId}_${this.stateDefinition.name}_${this.key}` : `${scope}_${this.stateDefinition.name}_${this.key}`; } + + private get errorKeyName() { + return `${this.stateDefinition.name} > ${this.key}`; + } } export type StorageKey = Opaque;