diff --git a/libs/common/spec/fake-storage.service.ts b/libs/common/spec/fake-storage.service.ts index ba3e461346..7c9e5b3231 100644 --- a/libs/common/spec/fake-storage.service.ts +++ b/libs/common/spec/fake-storage.service.ts @@ -59,7 +59,7 @@ export class FakeStorageService implements AbstractStorageService { return Promise.resolve(this.store[key] != null); } save(key: string, obj: T, options?: StorageOptions): Promise { - this.mock.save(key, options); + this.mock.save(key, obj, options); this.store[key] = obj; this.updatesSubject.next({ key: key, updateType: "save" }); return Promise.resolve(); diff --git a/libs/common/spec/utils.ts b/libs/common/spec/utils.ts index 5053a71c87..ad5907f61d 100644 --- a/libs/common/spec/utils.ts +++ b/libs/common/spec/utils.ts @@ -69,6 +69,10 @@ export function trackEmissions(observable: Observable): T[] { case "boolean": emissions.push(value); break; + case "symbol": + // Cheating types to make symbols work at all + emissions.push(value.toString() as T); + break; default: { emissions.push(clone(value)); } @@ -85,7 +89,7 @@ function clone(value: any): any { } } -export async function awaitAsync(ms = 0) { +export async function awaitAsync(ms = 1) { if (ms < 1) { await Promise.resolve(); } else { diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts index 065f7a8e95..e4a3f80eec 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts @@ -2,7 +2,7 @@ * need to update test environment so trackEmissions works appropriately * @jest-environment ../shared/test.environment.ts */ -import { any, mock } from "jest-mock-extended"; +import { any, anySymbol, mock } from "jest-mock-extended"; import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs"; import { Jsonify } from "type-fest"; @@ -11,7 +11,7 @@ import { FakeStorageService } from "../../../../spec/fake-storage.service"; import { AccountInfo, AccountService } from "../../../auth/abstractions/account.service"; import { AuthenticationStatus } from "../../../auth/enums/authentication-status"; import { UserId } from "../../../types/guid"; -import { KeyDefinition } from "../key-definition"; +import { KeyDefinition, userKeyBuilder } from "../key-definition"; import { StateDefinition } from "../state-definition"; import { DefaultActiveUserState } from "./default-active-user-state"; @@ -32,9 +32,10 @@ class TestState { } const testStateDefinition = new StateDefinition("fake", "disk"); - +const cleanupDelayMs = 10; const testKeyDefinition = new KeyDefinition(testStateDefinition, "fake", { deserializer: TestState.fromJSON, + cleanupDelayMs, }); describe("DefaultActiveUserState", () => { @@ -56,10 +57,14 @@ describe("DefaultActiveUserState", () => { ); }); + const makeUserId = (id: string) => { + return id != null ? (`00000000-0000-1000-a000-00000000000${id}` as UserId) : undefined; + }; + const changeActiveUser = async (id: string) => { - const userId = id != null ? `00000000-0000-1000-a000-00000000000${id}` : undefined; + const userId = makeUserId(id); activeAccountSubject.next({ - id: userId as UserId, + id: userId, email: `test${id}@example.com`, name: `Test User ${id}`, status: AuthenticationStatus.Unlocked, @@ -90,7 +95,7 @@ describe("DefaultActiveUserState", () => { const emissions = trackEmissions(userState.state$); // User signs in - changeActiveUser("1"); + await changeActiveUser("1"); await awaitAsync(); // Service does an update @@ -111,17 +116,17 @@ describe("DefaultActiveUserState", () => { expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 1, "user_00000000-0000-1000-a000-000000000001_fake_fake", - any(), + any(), // options ); expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 2, "user_00000000-0000-1000-a000-000000000001_fake_fake", - any(), + any(), // options ); expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 3, "user_00000000-0000-1000-a000-000000000002_fake_fake", - any(), + any(), // options ); // Should only have saved data for the first user @@ -129,7 +134,8 @@ describe("DefaultActiveUserState", () => { expect(diskStorageService.mock.save).toHaveBeenNthCalledWith( 1, "user_00000000-0000-1000-a000-000000000001_fake_fake", - any(), + updatedState, + any(), // options ); }); @@ -183,15 +189,17 @@ describe("DefaultActiveUserState", () => { }); it("should not emit a previous users value if that user is no longer active", async () => { + const user1Data: Jsonify = { + date: "2020-09-21T13:14:17.648Z", + array: ["value"], + }; + const user2Data: Jsonify = { + date: "2020-09-21T13:14:17.648Z", + array: [], + }; diskStorageService.internalUpdateStore({ - "user_00000000-0000-1000-a000-000000000001_fake_fake": { - date: "2020-09-21T13:14:17.648Z", - array: ["value"], - } as Jsonify, - "user_00000000-0000-1000-a000-000000000002_fake_fake": { - date: "2020-09-21T13:14:17.648Z", - array: [], - } as Jsonify, + "user_00000000-0000-1000-a000-000000000001_fake_fake": user1Data, + "user_00000000-0000-1000-a000-000000000002_fake_fake": user2Data, }); // This starts one subscription on the observable for tracking emissions throughout @@ -203,7 +211,7 @@ describe("DefaultActiveUserState", () => { // This should always return a value right await const value = await firstValueFrom(userState.state$); - expect(value).toBeTruthy(); + expect(value).toEqual(user1Data); // Make it such that there is no active user await changeActiveUser(undefined); @@ -222,20 +230,34 @@ describe("DefaultActiveUserState", () => { rejectedError = err; }); - expect(resolvedValue).toBeFalsy(); - expect(rejectedError).toBeTruthy(); + expect(resolvedValue).toBeUndefined(); + expect(rejectedError).not.toBeUndefined(); expect(rejectedError.message).toBe("Timeout has occurred"); // We need to figure out if something should be emitted // when there becomes no active user, if we don't want that to emit // this value is correct. - expect(emissions).toHaveLength(2); + expect(emissions).toEqual([user1Data]); + }); + + it("should not emit twice if there are two listeners", async () => { + await changeActiveUser("1"); + const emissions = trackEmissions(userState.state$); + const emissions2 = trackEmissions(userState.state$); + await awaitAsync(); + + expect(emissions).toEqual([ + null, // Initial value + ]); + expect(emissions2).toEqual([ + null, // Initial value + ]); }); describe("update", () => { const newData = { date: new Date(), array: ["test"] }; beforeEach(async () => { - changeActiveUser("1"); + await changeActiveUser("1"); }); it("should save on update", async () => { @@ -315,6 +337,8 @@ describe("DefaultActiveUserState", () => { return initialData; }); + await awaitAsync(); + await userState.update((state, dependencies) => { expect(state).toEqual(initialData); return newData; @@ -329,4 +353,285 @@ describe("DefaultActiveUserState", () => { ]); }); }); + + describe("update races", () => { + const newData = { date: new Date(), array: ["test"] }; + const userId = makeUserId("1"); + + beforeEach(async () => { + await changeActiveUser("1"); + await awaitAsync(); + }); + + test("subscriptions during an update should receive the current and latest", async () => { + const oldData = { date: new Date(2019, 1, 1), array: ["oldValue1"] }; + await userState.update(() => { + return oldData; + }); + const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] }; + await userState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => { + emissions2 = trackEmissions(userState.state$); + await originalSave(key, obj); + }); + + const val = await userState.update(() => { + return newData; + }); + + await awaitAsync(10); + + expect(val).toEqual(newData); + expect(emissions).toEqual([initialData, newData]); + expect(emissions2).toEqual([initialData, newData]); + }); + + test("subscription during an aborted update should receive the last value", async () => { + // Seed with interesting data + const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] }; + await userState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const val = await userState.update( + (state) => { + return newData; + }, + { + shouldUpdate: () => { + emissions2 = trackEmissions(userState.state$); + return false; + }, + }, + ); + + await awaitAsync(); + + expect(val).toEqual(initialData); + expect(emissions).toEqual([initialData]); + + expect(emissions2).toEqual([initialData]); + }); + + test("updates should wait until previous update is complete", async () => { + trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async (key: string, obj: any) => { + let resolved = false; + await Promise.race([ + userState.update(() => { + // deadlocks + resolved = true; + return newData; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(false); + }) + .mockImplementation((...args) => { + return originalSave(...args); + }); + + await userState.update(() => { + return newData; + }); + }); + + test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { + expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT + const val = await userState.update((state) => { + return newData; + }); + + expect(val).toEqual(newData); + const call = diskStorageService.mock.save.mock.calls[0]; + expect(call[0]).toEqual(`user_${userId}_fake_fake`); + expect(call[1]).toEqual(newData); + }); + + it("does not await updates if the active user changes", async () => { + const initialUserId = (await firstValueFrom(accountService.activeAccount$)).id; + expect(initialUserId).toBe(userId); + trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async (key: string, obj: any) => { + let resolved = false; + await changeActiveUser("2"); + await Promise.race([ + userState.update(() => { + // should not deadlock because we updated the user + resolved = true; + return newData; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(true); + }) + .mockImplementation((...args) => { + return originalSave(...args); + }); + + await userState.update(() => { + return newData; + }); + }); + + it("stores updates for users in the correct place when active user changes mid-update", async () => { + trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise + + const user2Data = { date: new Date(), array: ["user 2 data"] }; + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async (key: string, obj: any) => { + let resolved = false; + await changeActiveUser("2"); + await Promise.race([ + userState.update(() => { + // should not deadlock because we updated the user + resolved = true; + return user2Data; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(true); + await originalSave(key, obj); + }) + .mockImplementation((...args) => { + return originalSave(...args); + }); + + await userState.update(() => { + return newData; + }); + await awaitAsync(); + + expect(diskStorageService.mock.save).toHaveBeenCalledTimes(2); + const innerCall = diskStorageService.mock.save.mock.calls[0]; + expect(innerCall[0]).toEqual(`user_${makeUserId("2")}_fake_fake`); + expect(innerCall[1]).toEqual(user2Data); + const outerCall = diskStorageService.mock.save.mock.calls[1]; + expect(outerCall[0]).toEqual(`user_${makeUserId("1")}_fake_fake`); + expect(outerCall[1]).toEqual(newData); + }); + }); + + describe("cleanup", () => { + const newData = { date: new Date(), array: ["test"] }; + const userId = makeUserId("1"); + let userKey: string; + + beforeEach(async () => { + await changeActiveUser("1"); + userKey = userKeyBuilder(userId, testKeyDefinition); + }); + + async function assertClean() { + const emissions = trackEmissions(userState["stateSubject"]); + const initial = structuredClone(emissions); + + diskStorageService.save(userKey, newData); + await awaitAsync(); // storage updates are behind a promise + + expect(emissions).toEqual(initial); // no longer listening to storage updates + } + + it("should cleanup after last subscriber", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); // storage updates are behind a promise + + subscription.unsubscribe(); + expect(userState["subscriberCount"].getValue()).toBe(0); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("should not cleanup if there are still subscribers", async () => { + const subscription1 = userState.state$.subscribe(); + const sub2Emissions: TestState[] = []; + const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v)); + await awaitAsync(); // storage updates are behind a promise + + subscription1.unsubscribe(); + + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + expect(userState["subscriberCount"].getValue()).toBe(1); + + // Still be listening to storage updates + diskStorageService.save(userKey, newData); + await awaitAsync(); // storage updates are behind a promise + expect(sub2Emissions).toEqual([null, newData]); + + subscription2.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("can re-initialize after cleanup", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); + + subscription.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + + diskStorageService.save(userKey, newData); + await awaitAsync(); + + expect(emissions).toEqual([null, newData]); + }); + + it("should not cleanup if a subscriber joins during the cleanup delay", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); + + await diskStorageService.save(userKey, newData); + await awaitAsync(); + + subscription.unsubscribe(); + expect(userState["subscriberCount"].getValue()).toBe(0); + // Do not wait long enough for cleanup + await awaitAsync(cleanupDelayMs / 2); + + expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared + expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates + }); + }); }); diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.ts b/libs/common/src/platform/state/implementations/default-active-user-state.ts index 3d36af1d61..ae5c25dcb4 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.ts @@ -4,12 +4,12 @@ import { map, shareReplay, switchMap, - tap, - defer, firstValueFrom, combineLatestWith, filter, timeout, + Subscription, + tap, } from "rxjs"; import { AccountService } from "../../../auth/abstractions/account.service"; @@ -31,13 +31,21 @@ const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultActiveUserState implements ActiveUserState { [activeMarker]: true; private formattedKey$: Observable; + private updatePromise: Promise | null = null; + private storageUpdateSubscription: Subscription; + private activeAccountUpdateSubscription: Subscription; + private subscriberCount = new BehaviorSubject(0); + private stateObservable: Observable; protected stateSubject: BehaviorSubject = new BehaviorSubject< T | typeof FAKE_DEFAULT >(FAKE_DEFAULT); private stateSubject$ = this.stateSubject.asObservable(); - state$: Observable; + get state$() { + this.stateObservable = this.stateObservable ?? this.initializeObservable(); + return this.stateObservable; + } constructor( protected keyDefinition: KeyDefinition, @@ -51,62 +59,12 @@ export class DefaultActiveUserState implements ActiveUserState { ? userKeyBuilder(account.id, this.keyDefinition) : null, ), + tap(() => { + // We have a new key, so we should forget about previous update promises + this.updatePromise = null; + }), shareReplay({ bufferSize: 1, refCount: false }), ); - - const activeAccountData$ = this.formattedKey$.pipe( - switchMap(async (key) => { - if (key == null) { - return FAKE_DEFAULT; - } - return await getStoredValue( - key, - this.chosenStorageLocation, - this.keyDefinition.deserializer, - ); - }), - // Share the execution - shareReplay({ refCount: false, bufferSize: 1 }), - ); - - const storageUpdates$ = this.chosenStorageLocation.updates$.pipe( - combineLatestWith(this.formattedKey$), - filter(([update, key]) => key !== null && update.key === key), - switchMap(async ([update, key]) => { - if (update.updateType === "remove") { - return null; - } - const data = await getStoredValue( - key, - this.chosenStorageLocation, - this.keyDefinition.deserializer, - ); - return data; - }), - ); - - // Whomever subscribes to this data, should be notified of updated data - // if someone calls my update() method, or the active user changes. - this.state$ = defer(() => { - const accountChangeSubscription = activeAccountData$.subscribe((data) => { - this.stateSubject.next(data); - }); - const storageUpdateSubscription = storageUpdates$.subscribe((data) => { - this.stateSubject.next(data); - }); - - return this.stateSubject$.pipe( - tap({ - complete: () => { - accountChangeSubscription.unsubscribe(); - storageUpdateSubscription.unsubscribe(); - }, - }), - ); - }) - // I fake the generic here because I am filtering out the other union type - // and this makes it so that typescript understands the true type - .pipe(filter((value) => value != FAKE_DEFAULT)); } async update( @@ -114,8 +72,34 @@ export class DefaultActiveUserState implements ActiveUserState { options: StateUpdateOptions = {}, ): Promise { options = populateOptionsWithDefault(options); + try { + if (this.updatePromise != null) { + await this.updatePromise; + } + this.updatePromise = this.internalUpdate(configureState, options); + const newState = await this.updatePromise; + return newState; + } finally { + this.updatePromise = null; + } + } + + // TODO: this should be removed + async getFromState(): Promise { const key = await this.createKey(); - const currentState = await this.getGuaranteedState(key); + return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); + } + + createDerived(converter: Converter): DerivedUserState { + return new DefaultDerivedUserState(converter, this.encryptService, this); + } + + private async internalUpdate( + configureState: (state: T, dependency: TCombine) => T, + options: StateUpdateOptions, + ) { + const key = await this.createKey(); + const currentState = await this.getStateForUpdate(key); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -130,13 +114,53 @@ export class DefaultActiveUserState implements ActiveUserState { return newState; } - async getFromState(): Promise { - const key = await this.createKey(); - return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); - } + private initializeObservable() { + this.storageUpdateSubscription = this.chosenStorageLocation.updates$ + .pipe( + combineLatestWith(this.formattedKey$), + filter(([update, key]) => key !== null && update.key === key), + switchMap(async ([update, key]) => { + if (update.updateType === "remove") { + return null; + } + return await this.getState(key); + }), + ) + .subscribe((v) => this.stateSubject.next(v)); - createDerived(converter: Converter): DerivedUserState { - return new DefaultDerivedUserState(converter, this.encryptService, this); + this.activeAccountUpdateSubscription = this.formattedKey$ + .pipe( + switchMap(async (key) => { + if (key == null) { + return FAKE_DEFAULT; + } + return await this.getState(key); + }), + ) + .subscribe((v) => this.stateSubject.next(v)); + + this.subscriberCount.subscribe((count) => { + if (count === 0 && this.stateObservable != null) { + this.triggerCleanup(); + } + }); + + return new Observable((subscriber) => { + this.incrementSubscribers(); + + const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); + subscriber.unsubscribe = () => { + this.decrementSubscribers(); + prevUnsubscribe(); + }; + + return this.stateSubject + .pipe( + // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. + filter((i) => i !== FAKE_DEFAULT), + ) + .subscribe(subscriber); + }); } protected async createKey(): Promise { @@ -147,22 +171,47 @@ export class DefaultActiveUserState implements ActiveUserState { return formattedKey; } - protected async getGuaranteedState(key: string) { + /** For use in update methods, does not wait for update to complete before yielding state. + * The expectation is that that await is already done + */ + protected async getStateForUpdate(key: string) { const currentValue = this.stateSubject.getValue(); - return currentValue === FAKE_DEFAULT ? await this.seedInitial(key) : currentValue; + return currentValue === FAKE_DEFAULT + ? await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer) + : currentValue; } - private async seedInitial(key: string): Promise { - const value = await getStoredValue( - key, - this.chosenStorageLocation, - this.keyDefinition.deserializer, - ); - this.stateSubject.next(value); - return value; + /** To be used in observables. Awaits updates to ensure they are complete */ + private async getState(key: string): Promise { + if (this.updatePromise != null) { + await this.updatePromise; + } + return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); } protected saveToStorage(key: string, data: T): Promise { return this.chosenStorageLocation.save(key, data); } + + private incrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value + 1); + } + + private decrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value - 1); + } + + private triggerCleanup() { + setTimeout(() => { + if (this.subscriberCount.value === 0) { + this.updatePromise = null; + this.storageUpdateSubscription?.unsubscribe(); + this.activeAccountUpdateSubscription?.unsubscribe(); + this.stateObservable = null; + this.subscriberCount.complete(); + this.subscriberCount = new BehaviorSubject(0); + this.stateSubject.next(FAKE_DEFAULT); + } + }, this.keyDefinition.cleanupDelayMs); + } } diff --git a/libs/common/src/platform/state/implementations/default-global-state.spec.ts b/libs/common/src/platform/state/implementations/default-global-state.spec.ts index ae6cd1adbf..f9f95c652d 100644 --- a/libs/common/src/platform/state/implementations/default-global-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-global-state.spec.ts @@ -3,6 +3,7 @@ * @jest-environment ../shared/test.environment.ts */ +import { anySymbol } from "jest-mock-extended"; import { firstValueFrom, of } from "rxjs"; import { Jsonify } from "type-fest"; @@ -28,9 +29,10 @@ class TestState { } const testStateDefinition = new StateDefinition("fake", "disk"); - +const cleanupDelayMs = 10; const testKeyDefinition = new KeyDefinition(testStateDefinition, "fake", { deserializer: TestState.fromJSON, + cleanupDelayMs, }); const globalKey = globalKeyBuilder(testKeyDefinition); @@ -79,6 +81,19 @@ describe("DefaultGlobalState", () => { expect(diskStorageService.mock.get).toHaveBeenCalledWith("global_fake_fake", undefined); expect(state).toBeTruthy(); }); + + it("should not emit twice if there are two listeners", async () => { + const emissions = trackEmissions(globalState.state$); + const emissions2 = trackEmissions(globalState.state$); + await awaitAsync(); + + expect(emissions).toEqual([ + null, // Initial value + ]); + expect(emissions2).toEqual([ + null, // Initial value + ]); + }); }); describe("update", () => { @@ -133,6 +148,7 @@ describe("DefaultGlobalState", () => { it("should not update if shouldUpdate returns false", async () => { const emissions = trackEmissions(globalState.state$); + await awaitAsync(); // storage updates are behind a promise const result = await globalState.update( (state) => { @@ -198,4 +214,194 @@ describe("DefaultGlobalState", () => { expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); }); }); + + describe("update races", () => { + test("subscriptions during an update should receive the current and latest data", async () => { + const oldData = { date: new Date(2019, 1, 1) }; + await globalState.update(() => { + return oldData; + }); + const initialData = { date: new Date(2020, 1, 1) }; + await globalState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(globalState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => { + emissions2 = trackEmissions(globalState.state$); + await originalSave(key, obj); + }); + + const val = await globalState.update(() => { + return newData; + }); + + await awaitAsync(10); + + expect(val).toEqual(newData); + expect(emissions).toEqual([initialData, newData]); + expect(emissions2).toEqual([initialData, newData]); + }); + + test("subscription during an aborted update should receive the last value", async () => { + // Seed with interesting data + const initialData = { date: new Date(2020, 1, 1) }; + await globalState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(globalState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const val = await globalState.update( + () => { + return newData; + }, + { + shouldUpdate: () => { + emissions2 = trackEmissions(globalState.state$); + return false; + }, + }, + ); + + await awaitAsync(); + + expect(val).toEqual(initialData); + expect(emissions).toEqual([initialData]); + + expect(emissions2).toEqual([initialData]); + }); + + test("updates should wait until previous update is complete", async () => { + trackEmissions(globalState.state$); + await awaitAsync(); // storage updates are behind a promise + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async () => { + let resolved = false; + await Promise.race([ + globalState.update(() => { + // deadlocks + resolved = true; + return newData; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(false); + }) + .mockImplementation(originalSave); + + await globalState.update((state) => { + return newData; + }); + }); + + test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { + expect(globalState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT + const val = await globalState.update((state) => { + return newData; + }); + + expect(val).toEqual(newData); + const call = diskStorageService.mock.save.mock.calls[0]; + expect(call[0]).toEqual("global_fake_fake"); + expect(call[1]).toEqual(newData); + }); + }); + + describe("cleanup", () => { + async function assertClean() { + const emissions = trackEmissions(globalState["stateSubject"]); + const initial = structuredClone(emissions); + + diskStorageService.save(globalKey, newData); + await awaitAsync(); // storage updates are behind a promise + + expect(emissions).toEqual(initial); // no longer listening to storage updates + } + + it("should cleanup after last subscriber", async () => { + const subscription = globalState.state$.subscribe(); + await awaitAsync(); // storage updates are behind a promise + + subscription.unsubscribe(); + expect(globalState["subscriberCount"].getValue()).toBe(0); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("should not cleanup if there are still subscribers", async () => { + const subscription1 = globalState.state$.subscribe(); + const sub2Emissions: TestState[] = []; + const subscription2 = globalState.state$.subscribe((v) => sub2Emissions.push(v)); + await awaitAsync(); // storage updates are behind a promise + + subscription1.unsubscribe(); + + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + expect(globalState["subscriberCount"].getValue()).toBe(1); + + // Still be listening to storage updates + diskStorageService.save(globalKey, newData); + await awaitAsync(); // storage updates are behind a promise + expect(sub2Emissions).toEqual([null, newData]); + + subscription2.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("can re-initialize after cleanup", async () => { + const subscription = globalState.state$.subscribe(); + await awaitAsync(); + + subscription.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + const emissions = trackEmissions(globalState.state$); + await awaitAsync(); + + diskStorageService.save(globalKey, newData); + await awaitAsync(); + + expect(emissions).toEqual([null, newData]); + }); + + it("should not cleanup if a subscriber joins during the cleanup delay", async () => { + const subscription = globalState.state$.subscribe(); + await awaitAsync(); + + await diskStorageService.save(globalKey, newData); + await awaitAsync(); + + subscription.unsubscribe(); + expect(globalState["subscriberCount"].getValue()).toBe(0); + // Do not wait long enough for cleanup + await awaitAsync(cleanupDelayMs / 2); + + expect(globalState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared + expect(globalState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates + }); + }); }); diff --git a/libs/common/src/platform/state/implementations/default-global-state.ts b/libs/common/src/platform/state/implementations/default-global-state.ts index 8e08717f72..39430799a8 100644 --- a/libs/common/src/platform/state/implementations/default-global-state.ts +++ b/libs/common/src/platform/state/implementations/default-global-state.ts @@ -1,12 +1,10 @@ import { BehaviorSubject, Observable, - defer, + Subscription, filter, firstValueFrom, - shareReplay, switchMap, - tap, timeout, } from "rxjs"; @@ -23,54 +21,25 @@ const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultGlobalState implements GlobalState { private storageKey: string; + private updatePromise: Promise | null = null; + private storageUpdateSubscription: Subscription; + private subscriberCount = new BehaviorSubject(0); + private stateObservable: Observable; protected stateSubject: BehaviorSubject = new BehaviorSubject< T | typeof FAKE_DEFAULT >(FAKE_DEFAULT); - state$: Observable; + get state$() { + this.stateObservable = this.stateObservable ?? this.initializeObservable(); + return this.stateObservable; + } constructor( private keyDefinition: KeyDefinition, private chosenLocation: AbstractStorageService & ObservableStorageService, ) { this.storageKey = globalKeyBuilder(this.keyDefinition); - - const storageUpdates$ = this.chosenLocation.updates$.pipe( - filter((update) => update.key === this.storageKey), - switchMap(async (update) => { - if (update.updateType === "remove") { - return null; - } - return await getStoredValue( - this.storageKey, - this.chosenLocation, - this.keyDefinition.deserializer, - ); - }), - shareReplay({ bufferSize: 1, refCount: false }), - ); - - this.state$ = defer(() => { - const storageUpdateSubscription = storageUpdates$.subscribe((value) => { - this.stateSubject.next(value); - }); - - this.getFromState().then((s) => { - this.stateSubject.next(s); - }); - - return this.stateSubject.pipe( - tap({ - complete: () => { - storageUpdateSubscription.unsubscribe(); - }, - }), - ); - }).pipe( - shareReplay({ refCount: false, bufferSize: 1 }), - filter((i) => i != FAKE_DEFAULT), - ); } async update( @@ -78,7 +47,24 @@ export class DefaultGlobalState implements GlobalState { options: StateUpdateOptions = {}, ): Promise { options = populateOptionsWithDefault(options); - const currentState = await this.getGuaranteedState(); + if (this.updatePromise != null) { + await this.updatePromise; + } + + try { + this.updatePromise = this.internalUpdate(configureState, options); + const newState = await this.updatePromise; + return newState; + } finally { + this.updatePromise = null; + } + } + + private async internalUpdate( + configureState: (state: T, dependency: TCombine) => T, + options: StateUpdateOptions, + ): Promise { + const currentState = await this.getStateForUpdate(); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -93,16 +79,86 @@ export class DefaultGlobalState implements GlobalState { return newState; } - private async getGuaranteedState() { + private initializeObservable() { + this.storageUpdateSubscription = this.chosenLocation.updates$ + .pipe( + filter((update) => update.key === this.storageKey), + switchMap(async (update) => { + if (update.updateType === "remove") { + return null; + } + return await this.getFromState(); + }), + ) + .subscribe((v) => this.stateSubject.next(v)); + + this.subscriberCount.subscribe((count) => { + if (count === 0 && this.stateObservable != null) { + this.triggerCleanup(); + } + }); + + // Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to + // trigger populating it immediately. + this.getFromState().then((s) => { + this.stateSubject.next(s); + }); + + return new Observable((subscriber) => { + this.incrementSubscribers(); + + const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); + subscriber.unsubscribe = () => { + this.decrementSubscribers(); + prevUnsubscribe(); + }; + + return this.stateSubject + .pipe( + // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. + filter((i) => i != FAKE_DEFAULT), + ) + .subscribe(subscriber); + }); + } + + /** For use in update methods, does not wait for update to complete before yielding state. + * The expectation is that that await is already done + */ + private async getStateForUpdate() { const currentValue = this.stateSubject.getValue(); return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; } async getFromState(): Promise { + if (this.updatePromise != null) { + return await this.updatePromise; + } return await getStoredValue( this.storageKey, this.chosenLocation, this.keyDefinition.deserializer, ); } + + private incrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value + 1); + } + + private decrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value - 1); + } + + private triggerCleanup() { + setTimeout(() => { + if (this.subscriberCount.value === 0) { + this.updatePromise = null; + this.storageUpdateSubscription.unsubscribe(); + this.stateObservable = null; + this.subscriberCount.complete(); + this.subscriberCount = new BehaviorSubject(0); + this.stateSubject.next(FAKE_DEFAULT); + } + }, this.keyDefinition.cleanupDelayMs); + } } diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts b/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts index a25ee863e6..1c24c5f48c 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts @@ -3,6 +3,7 @@ * @jest-environment ../shared/test.environment.ts */ +import { anySymbol } from "jest-mock-extended"; import { firstValueFrom, of } from "rxjs"; import { Jsonify } from "type-fest"; @@ -30,21 +31,22 @@ class TestState { } const testStateDefinition = new StateDefinition("fake", "disk"); - +const cleanupDelayMs = 10; const testKeyDefinition = new KeyDefinition(testStateDefinition, "fake", { deserializer: TestState.fromJSON, + cleanupDelayMs, }); const userId = Utils.newGuid() as UserId; const userKey = userKeyBuilder(userId, testKeyDefinition); describe("DefaultSingleUserState", () => { let diskStorageService: FakeStorageService; - let globalState: DefaultSingleUserState; + let userState: DefaultSingleUserState; const newData = { date: new Date() }; beforeEach(() => { diskStorageService = new FakeStorageService(); - globalState = new DefaultSingleUserState( + userState = new DefaultSingleUserState( userId, testKeyDefinition, null, // Not testing anything with encrypt service @@ -58,7 +60,7 @@ describe("DefaultSingleUserState", () => { describe("state$", () => { it("should emit when storage updates", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await diskStorageService.save(userKey, newData); await awaitAsync(); @@ -69,7 +71,7 @@ describe("DefaultSingleUserState", () => { }); it("should not emit when update key does not match", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await diskStorageService.save("wrong_key", newData); expect(emissions).toHaveLength(0); @@ -82,7 +84,7 @@ describe("DefaultSingleUserState", () => { }); diskStorageService.internalUpdateStore(initialStorage); - const state = await firstValueFrom(globalState.state$); + const state = await firstValueFrom(userState.state$); expect(diskStorageService.mock.get).toHaveBeenCalledTimes(1); expect(diskStorageService.mock.get).toHaveBeenCalledWith( `user_${userId}_fake_fake`, @@ -94,7 +96,7 @@ describe("DefaultSingleUserState", () => { describe("update", () => { it("should save on update", async () => { - const result = await globalState.update((state) => { + const result = await userState.update((state) => { return newData; }); @@ -103,10 +105,10 @@ describe("DefaultSingleUserState", () => { }); it("should emit once per update", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await awaitAsync(); // storage updates are behind a promise - await globalState.update((state) => { + await userState.update((state) => { return newData; }); @@ -119,12 +121,12 @@ describe("DefaultSingleUserState", () => { }); it("should provided combined dependencies", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await awaitAsync(); // storage updates are behind a promise const combinedDependencies = { date: new Date() }; - await globalState.update( + await userState.update( (state, dependencies) => { expect(dependencies).toEqual(combinedDependencies); return newData; @@ -143,9 +145,10 @@ describe("DefaultSingleUserState", () => { }); it("should not update if shouldUpdate returns false", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise - const result = await globalState.update( + const result = await userState.update( (state) => { return newData; }, @@ -160,18 +163,18 @@ describe("DefaultSingleUserState", () => { }); it("should provide the update callback with the current State", async () => { - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await awaitAsync(); // storage updates are behind a promise // Seed with interesting data const initialData = { date: new Date(2020, 1, 1) }; - await globalState.update((state, dependencies) => { + await userState.update((state, dependencies) => { return initialData; }); await awaitAsync(); - await globalState.update((state) => { + await userState.update((state) => { expect(state).toEqual(initialData); return newData; }); @@ -193,14 +196,14 @@ describe("DefaultSingleUserState", () => { initialStorage[userKey] = initialState; diskStorageService.internalUpdateStore(initialStorage); - const emissions = trackEmissions(globalState.state$); + const emissions = trackEmissions(userState.state$); await awaitAsync(); // storage updates are behind a promise const newState = { ...initialState, date: new Date(initialState.date.getFullYear(), initialState.date.getMonth() + 1), }; - const actual = await globalState.update((existingState) => newState); + const actual = await userState.update((existingState) => newState); await awaitAsync(); @@ -209,4 +212,194 @@ describe("DefaultSingleUserState", () => { expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); }); }); + + describe("update races", () => { + test("subscriptions during an update should receive the current and latest data", async () => { + const oldData = { date: new Date(2019, 1, 1) }; + await userState.update(() => { + return oldData; + }); + const initialData = { date: new Date(2020, 1, 1) }; + await userState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => { + emissions2 = trackEmissions(userState.state$); + await originalSave(key, obj); + }); + + const val = await userState.update(() => { + return newData; + }); + + await awaitAsync(10); + + expect(val).toEqual(newData); + expect(emissions).toEqual([initialData, newData]); + expect(emissions2).toEqual([initialData, newData]); + }); + + test("subscription during an aborted update should receive the last value", async () => { + // Seed with interesting data + const initialData = { date: new Date(2020, 1, 1) }; + await userState.update(() => { + return initialData; + }); + + await awaitAsync(); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + expect(emissions).toEqual([initialData]); + + let emissions2: TestState[]; + const val = await userState.update( + (state) => { + return newData; + }, + { + shouldUpdate: () => { + emissions2 = trackEmissions(userState.state$); + return false; + }, + }, + ); + + await awaitAsync(); + + expect(val).toEqual(initialData); + expect(emissions).toEqual([initialData]); + + expect(emissions2).toEqual([initialData]); + }); + + test("updates should wait until previous update is complete", async () => { + trackEmissions(userState.state$); + await awaitAsync(); // storage updates are behind a promise + + const originalSave = diskStorageService.save.bind(diskStorageService); + diskStorageService.save = jest + .fn() + .mockImplementationOnce(async () => { + let resolved = false; + await Promise.race([ + userState.update(() => { + // deadlocks + resolved = true; + return newData; + }), + awaitAsync(100), // limit test to 100ms + ]); + expect(resolved).toBe(false); + }) + .mockImplementation(originalSave); + + await userState.update((state) => { + return newData; + }); + }); + + test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { + expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT + const val = await userState.update((state) => { + return newData; + }); + + expect(val).toEqual(newData); + const call = diskStorageService.mock.save.mock.calls[0]; + expect(call[0]).toEqual(`user_${userId}_fake_fake`); + expect(call[1]).toEqual(newData); + }); + }); + + describe("cleanup", () => { + async function assertClean() { + const emissions = trackEmissions(userState["stateSubject"]); + const initial = structuredClone(emissions); + + diskStorageService.save(userKey, newData); + await awaitAsync(); // storage updates are behind a promise + + expect(emissions).toEqual(initial); // no longer listening to storage updates + } + + it("should cleanup after last subscriber", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); // storage updates are behind a promise + + subscription.unsubscribe(); + expect(userState["subscriberCount"].getValue()).toBe(0); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("should not cleanup if there are still subscribers", async () => { + const subscription1 = userState.state$.subscribe(); + const sub2Emissions: TestState[] = []; + const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v)); + await awaitAsync(); // storage updates are behind a promise + + subscription1.unsubscribe(); + + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + expect(userState["subscriberCount"].getValue()).toBe(1); + + // Still be listening to storage updates + diskStorageService.save(userKey, newData); + await awaitAsync(); // storage updates are behind a promise + expect(sub2Emissions).toEqual([null, newData]); + + subscription2.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + await assertClean(); + }); + + it("can re-initialize after cleanup", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); + + subscription.unsubscribe(); + // Wait for cleanup + await awaitAsync(cleanupDelayMs * 2); + + const emissions = trackEmissions(userState.state$); + await awaitAsync(); + + diskStorageService.save(userKey, newData); + await awaitAsync(); + + expect(emissions).toEqual([null, newData]); + }); + + it("should not cleanup if a subscriber joins during the cleanup delay", async () => { + const subscription = userState.state$.subscribe(); + await awaitAsync(); + + await diskStorageService.save(userKey, newData); + await awaitAsync(); + + subscription.unsubscribe(); + expect(userState["subscriberCount"].getValue()).toBe(0); + // Do not wait long enough for cleanup + await awaitAsync(cleanupDelayMs / 2); + + expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared + expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates + }); + }); }); diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.ts b/libs/common/src/platform/state/implementations/default-single-user-state.ts index 46fa00ffb3..0e9cacae51 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.ts @@ -1,12 +1,10 @@ import { BehaviorSubject, Observable, - defer, + Subscription, filter, firstValueFrom, - shareReplay, switchMap, - tap, timeout, } from "rxjs"; @@ -23,16 +21,24 @@ import { Converter, SingleUserState } from "../user-state"; import { DefaultDerivedUserState } from "./default-derived-state"; import { getStoredValue } from "./util"; + const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultSingleUserState implements SingleUserState { private storageKey: string; + private updatePromise: Promise | null = null; + private storageUpdateSubscription: Subscription; + private subscriberCount = new BehaviorSubject(0); + private stateObservable: Observable; protected stateSubject: BehaviorSubject = new BehaviorSubject< T | typeof FAKE_DEFAULT >(FAKE_DEFAULT); - state$: Observable; + get state$() { + this.stateObservable = this.stateObservable ?? this.initializeObservable(); + return this.stateObservable; + } constructor( readonly userId: UserId, @@ -41,42 +47,6 @@ export class DefaultSingleUserState implements SingleUserState { private chosenLocation: AbstractStorageService & ObservableStorageService, ) { this.storageKey = userKeyBuilder(this.userId, this.keyDefinition); - - const storageUpdates$ = this.chosenLocation.updates$.pipe( - filter((update) => update.key === this.storageKey), - switchMap(async (update) => { - if (update.updateType === "remove") { - return null; - } - return await getStoredValue( - this.storageKey, - this.chosenLocation, - this.keyDefinition.deserializer, - ); - }), - shareReplay({ bufferSize: 1, refCount: false }), - ); - - this.state$ = defer(() => { - const storageUpdateSubscription = storageUpdates$.subscribe((value) => { - this.stateSubject.next(value); - }); - - this.getFromState().then((s) => { - this.stateSubject.next(s); - }); - - return this.stateSubject.pipe( - tap({ - complete: () => { - storageUpdateSubscription.unsubscribe(); - }, - }), - ); - }).pipe( - shareReplay({ refCount: false, bufferSize: 1 }), - filter((i) => i != FAKE_DEFAULT), - ); } async update( @@ -84,7 +54,28 @@ export class DefaultSingleUserState implements SingleUserState { options: StateUpdateOptions = {}, ): Promise { options = populateOptionsWithDefault(options); - const currentState = await this.getGuaranteedState(); + if (this.updatePromise != null) { + await this.updatePromise; + } + + try { + this.updatePromise = this.internalUpdate(configureState, options); + const newState = await this.updatePromise; + return newState; + } finally { + this.updatePromise = null; + } + } + + createDerived(converter: Converter): DerivedUserState { + return new DefaultDerivedUserState(converter, this.encryptService, this); + } + + private async internalUpdate( + configureState: (state: T, dependency: TCombine) => T, + options: StateUpdateOptions, + ): Promise { + const currentState = await this.getStateForUpdate(); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -99,20 +90,86 @@ export class DefaultSingleUserState implements SingleUserState { return newState; } - createDerived(converter: Converter): DerivedUserState { - return new DefaultDerivedUserState(converter, this.encryptService, this); + private initializeObservable() { + this.storageUpdateSubscription = this.chosenLocation.updates$ + .pipe( + filter((update) => update.key === this.storageKey), + switchMap(async (update) => { + if (update.updateType === "remove") { + return null; + } + return await this.getFromState(); + }), + ) + .subscribe((v) => this.stateSubject.next(v)); + + this.subscriberCount.subscribe((count) => { + if (count === 0 && this.stateObservable != null) { + this.triggerCleanup(); + } + }); + + // Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to + // trigger populating it immediately. + this.getFromState().then((s) => { + this.stateSubject.next(s); + }); + + return new Observable((subscriber) => { + this.incrementSubscribers(); + + const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); + subscriber.unsubscribe = () => { + this.decrementSubscribers(); + prevUnsubscribe(); + }; + + return this.stateSubject + .pipe( + // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. + filter((i) => i != FAKE_DEFAULT), + ) + .subscribe(subscriber); + }); } - private async getGuaranteedState() { + /** For use in update methods, does not wait for update to complete before yielding state. + * The expectation is that that await is already done + */ + private async getStateForUpdate() { const currentValue = this.stateSubject.getValue(); return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; } async getFromState(): Promise { + if (this.updatePromise != null) { + return await this.updatePromise; + } return await getStoredValue( this.storageKey, this.chosenLocation, this.keyDefinition.deserializer, ); } + + private incrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value + 1); + } + + private decrementSubscribers() { + this.subscriberCount.next(this.subscriberCount.value - 1); + } + + private triggerCleanup() { + setTimeout(() => { + if (this.subscriberCount.value === 0) { + this.updatePromise = null; + this.storageUpdateSubscription.unsubscribe(); + this.stateObservable = null; + this.subscriberCount.complete(); + this.subscriberCount = new BehaviorSubject(0); + this.stateSubject.next(FAKE_DEFAULT); + } + }, this.keyDefinition.cleanupDelayMs); + } } diff --git a/libs/common/src/platform/state/key-definition.spec.ts b/libs/common/src/platform/state/key-definition.spec.ts index cbb1e49a9a..ee926bccd8 100644 --- a/libs/common/src/platform/state/key-definition.spec.ts +++ b/libs/common/src/platform/state/key-definition.spec.ts @@ -18,6 +18,37 @@ describe("KeyDefinition", () => { }); }); + describe("cleanupDelayMs", () => { + it("defaults to 1000ms", () => { + const keyDefinition = new KeyDefinition(fakeStateDefinition, "fake", { + deserializer: (value) => value, + }); + + expect(keyDefinition).toBeTruthy(); + expect(keyDefinition.cleanupDelayMs).toBe(1000); + }); + + it("can be overridden", () => { + const keyDefinition = new KeyDefinition(fakeStateDefinition, "fake", { + deserializer: (value) => value, + cleanupDelayMs: 500, + }); + + expect(keyDefinition).toBeTruthy(); + expect(keyDefinition.cleanupDelayMs).toBe(500); + }); + + it.each([0, -1])("throws on 0 or negative (%s)", (testValue: number) => { + expect( + () => + new KeyDefinition(fakeStateDefinition, "fake", { + deserializer: (value) => value, + cleanupDelayMs: testValue, + }), + ).toThrow(); + }); + }); + describe("record", () => { it("runs custom deserializer for each record value", () => { const recordDefinition = KeyDefinition.record(fakeStateDefinition, "fake", { diff --git a/libs/common/src/platform/state/key-definition.ts b/libs/common/src/platform/state/key-definition.ts index db65740388..9989bf37a2 100644 --- a/libs/common/src/platform/state/key-definition.ts +++ b/libs/common/src/platform/state/key-definition.ts @@ -19,6 +19,11 @@ type KeyDefinitionOptions = { * @returns The fully typed version of your state. */ readonly deserializer: (jsonValue: Jsonify) => T; + /** + * The number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed. + * Defaults to 1000ms. + */ + readonly cleanupDelayMs?: number; }; /** @@ -42,8 +47,12 @@ export class KeyDefinition { private readonly options: KeyDefinitionOptions, ) { if (options.deserializer == null) { + throw new Error(`'deserializer' is a required property on key ${this.errorKeyName}`); + } + + if (options.cleanupDelayMs <= 0) { throw new Error( - `'deserializer' is a required property on key ${stateDefinition.name} > ${key}`, + `'cleanupDelayMs' must be greater than 0. Value of ${options.cleanupDelayMs} passed to key ${this.errorKeyName} `, ); } } @@ -55,6 +64,13 @@ export class KeyDefinition { return this.options.deserializer; } + /** + * Gets the number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed. + */ + get cleanupDelayMs() { + return this.options.cleanupDelayMs < 0 ? 0 : this.options.cleanupDelayMs ?? 1000; + } + /** * Creates a {@link KeyDefinition} for state that is an array. * @param stateDefinition The state definition to be added to the KeyDefinition @@ -137,6 +153,10 @@ export class KeyDefinition { ? `${scope}_${userId}_${this.stateDefinition.name}_${this.key}` : `${scope}_${this.stateDefinition.name}_${this.key}`; } + + private get errorKeyName() { + return `${this.stateDefinition.name} > ${this.key}`; + } } export type StorageKey = Opaque;