From 5e11cb212dc25b79924d1d64d497d53d523ac0f4 Mon Sep 17 00:00:00 2001 From: Justin Baur <19896123+justindbaur@users.noreply.github.com> Date: Thu, 4 Jan 2024 16:30:20 -0500 Subject: [PATCH] Combined State (#7383) * Introduce Combined State * Cleanup Test * Update Fakes * Address PR Feedback * Update libs/common/src/platform/state/implementations/default-active-user-state.ts Co-authored-by: Matt Gibson * Prettier * Get rid of ReplaySubject reference --------- Co-authored-by: Matt Gibson --- .../browser/src/background/main.background.ts | 2 - .../active-user-state-provider.factory.ts | 3 - .../single-user-state-provider.factory.ts | 3 - apps/cli/src/bw.ts | 2 - .../src/services/jslib-services.module.ts | 9 +- libs/common/spec/fake-state.ts | 38 +-- .../default-active-user-state.provider.ts | 3 - .../default-active-user-state.spec.ts | 81 +++--- .../default-active-user-state.ts | 234 +++++++----------- .../default-single-user-state.provider.ts | 3 - .../default-single-user-state.spec.ts | 87 +++++-- .../default-single-user-state.ts | 142 +++-------- libs/common/src/platform/state/user-state.ts | 12 +- 13 files changed, 280 insertions(+), 339 deletions(-) diff --git a/apps/browser/src/background/main.background.ts b/apps/browser/src/background/main.background.ts index a8fb5947e4..2eda6e31ca 100644 --- a/apps/browser/src/background/main.background.ts +++ b/apps/browser/src/background/main.background.ts @@ -321,7 +321,6 @@ export default class MainBackground { : new EncryptServiceImplementation(this.cryptoFunctionService, this.logService, true); this.singleUserStateProvider = new DefaultSingleUserStateProvider( - this.encryptService, this.memoryStorageService as BackgroundMemoryStorageService, this.storageService as BrowserLocalStorageService, ); @@ -332,7 +331,6 @@ export default class MainBackground { ); this.activeUserStateProvider = new DefaultActiveUserStateProvider( this.accountService, - this.encryptService, this.memoryStorageService as BackgroundMemoryStorageService, this.storageService as BrowserLocalStorageService, ); diff --git a/apps/browser/src/platform/background/service-factories/active-user-state-provider.factory.ts b/apps/browser/src/platform/background/service-factories/active-user-state-provider.factory.ts index d63dac7099..e2d8746c7c 100644 --- a/apps/browser/src/platform/background/service-factories/active-user-state-provider.factory.ts +++ b/apps/browser/src/platform/background/service-factories/active-user-state-provider.factory.ts @@ -7,7 +7,6 @@ import { accountServiceFactory, } from "../../../auth/background/service-factories/account-service.factory"; -import { EncryptServiceInitOptions, encryptServiceFactory } from "./encrypt-service.factory"; import { CachedServices, FactoryOptions, factory } from "./factory-options"; import { DiskStorageServiceInitOptions, @@ -20,7 +19,6 @@ type ActiveUserStateProviderFactory = FactoryOptions; export type ActiveUserStateProviderInitOptions = ActiveUserStateProviderFactory & AccountServiceInitOptions & - EncryptServiceInitOptions & MemoryStorageServiceInitOptions & DiskStorageServiceInitOptions; @@ -35,7 +33,6 @@ export async function activeUserStateProviderFactory( async () => new DefaultActiveUserStateProvider( await accountServiceFactory(cache, opts), - await encryptServiceFactory(cache, opts), await observableMemoryStorageServiceFactory(cache, opts), await observableDiskStorageServiceFactory(cache, opts), ), diff --git a/apps/browser/src/platform/background/service-factories/single-user-state-provider.factory.ts b/apps/browser/src/platform/background/service-factories/single-user-state-provider.factory.ts index 06cca7363e..0af6f82214 100644 --- a/apps/browser/src/platform/background/service-factories/single-user-state-provider.factory.ts +++ b/apps/browser/src/platform/background/service-factories/single-user-state-provider.factory.ts @@ -2,7 +2,6 @@ import { SingleUserStateProvider } from "@bitwarden/common/platform/state"; // eslint-disable-next-line import/no-restricted-paths -- We need the implementation to inject, but generally this should not be accessed import { DefaultSingleUserStateProvider } from "@bitwarden/common/platform/state/implementations/default-single-user-state.provider"; -import { EncryptServiceInitOptions, encryptServiceFactory } from "./encrypt-service.factory"; import { CachedServices, FactoryOptions, factory } from "./factory-options"; import { DiskStorageServiceInitOptions, @@ -14,7 +13,6 @@ import { type SingleUserStateProviderFactoryOptions = FactoryOptions; export type SingleUserStateProviderInitOptions = SingleUserStateProviderFactoryOptions & - EncryptServiceInitOptions & MemoryStorageServiceInitOptions & DiskStorageServiceInitOptions; @@ -28,7 +26,6 @@ export async function singleUserStateProviderFactory( opts, async () => new DefaultSingleUserStateProvider( - await encryptServiceFactory(cache, opts), await observableMemoryStorageServiceFactory(cache, opts), await observableDiskStorageServiceFactory(cache, opts), ), diff --git a/apps/cli/src/bw.ts b/apps/cli/src/bw.ts index 4572c16879..02be59da0a 100644 --- a/apps/cli/src/bw.ts +++ b/apps/cli/src/bw.ts @@ -226,7 +226,6 @@ export class Main { ); this.singleUserStateProvider = new DefaultSingleUserStateProvider( - this.encryptService, this.memoryStorageService, this.storageService, ); @@ -241,7 +240,6 @@ export class Main { this.activeUserStateProvider = new DefaultActiveUserStateProvider( this.accountService, - this.encryptService, this.memoryStorageService, this.storageService, ); diff --git a/libs/angular/src/services/jslib-services.module.ts b/libs/angular/src/services/jslib-services.module.ts index 7a58aff953..408bce8dbe 100644 --- a/libs/angular/src/services/jslib-services.module.ts +++ b/libs/angular/src/services/jslib-services.module.ts @@ -800,17 +800,12 @@ import { ModalService } from "./modal.service"; { provide: ActiveUserStateProvider, useClass: DefaultActiveUserStateProvider, - deps: [ - AccountServiceAbstraction, - EncryptService, - OBSERVABLE_MEMORY_STORAGE, - OBSERVABLE_DISK_STORAGE, - ], + deps: [AccountServiceAbstraction, OBSERVABLE_MEMORY_STORAGE, OBSERVABLE_DISK_STORAGE], }, { provide: SingleUserStateProvider, useClass: DefaultSingleUserStateProvider, - deps: [EncryptService, OBSERVABLE_MEMORY_STORAGE, OBSERVABLE_DISK_STORAGE], + deps: [OBSERVABLE_MEMORY_STORAGE, OBSERVABLE_DISK_STORAGE], }, { provide: DerivedStateProvider, diff --git a/libs/common/spec/fake-state.ts b/libs/common/spec/fake-state.ts index b3ea80d3e4..b56c129c62 100644 --- a/libs/common/spec/fake-state.ts +++ b/libs/common/spec/fake-state.ts @@ -1,10 +1,10 @@ -import { ReplaySubject, firstValueFrom, timeout } from "rxjs"; +import { Observable, ReplaySubject, firstValueFrom, map, timeout } from "rxjs"; import { DerivedState, GlobalState, SingleUserState, ActiveUserState } from "../src/platform/state"; // eslint-disable-next-line import/no-restricted-paths -- using unexposed options for clean typing in test class import { StateUpdateOptions } from "../src/platform/state/state-update-options"; // eslint-disable-next-line import/no-restricted-paths -- using unexposed options for clean typing in test class -import { UserState, activeMarker } from "../src/platform/state/user-state"; +import { CombinedState, UserState, activeMarker } from "../src/platform/state/user-state"; import { UserId } from "../src/types/guid"; const DEFAULT_TEST_OPTIONS: StateUpdateOptions = { @@ -57,9 +57,19 @@ export class FakeGlobalState implements GlobalState { } } -export class FakeUserState implements UserState { +abstract class FakeUserState implements UserState { // eslint-disable-next-line rxjs/no-exposed-subjects -- exposed for testing setup - stateSubject = new ReplaySubject(1); + stateSubject = new ReplaySubject>(1); + + protected userId: UserId; + + state$: Observable; + combinedState$: Observable>; + + constructor() { + this.combinedState$ = this.stateSubject.asObservable(); + this.state$ = this.combinedState$.pipe(map(([_userId, state]) => state)); + } update: ( configureState: (state: T, dependency: TCombine) => T, @@ -75,34 +85,24 @@ export class FakeUserState implements UserState { return current; } const newState = configureState(current, combinedDependencies); - this.stateSubject.next(newState); + this.stateSubject.next([this.userId, newState]); return newState; }); updateMock = this.update as jest.MockedFunction; - - updateFor: ( - userId: UserId, - configureState: (state: T, dependency: TCombine) => T, - options?: StateUpdateOptions, - ) => Promise = jest.fn(); - - getFromState: () => Promise = jest.fn(async () => { - return await firstValueFrom(this.state$.pipe(timeout(10))); - }); - - get state$() { - return this.stateSubject.asObservable(); - } } export class FakeSingleUserState extends FakeUserState implements SingleUserState { constructor(readonly userId: UserId) { super(); + this.userId = userId; } } export class FakeActiveUserState extends FakeUserState implements ActiveUserState { [activeMarker]: true; + changeActiveUser(userId: UserId) { + this.userId = userId; + } } export class FakeDerivedState implements DerivedState { diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.provider.ts b/libs/common/src/platform/state/implementations/default-active-user-state.provider.ts index 0950918fc5..ca2137efc1 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.provider.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.provider.ts @@ -1,5 +1,4 @@ import { AccountService } from "../../../auth/abstractions/account.service"; -import { EncryptService } from "../../abstractions/encrypt.service"; import { AbstractMemoryStorageService, AbstractStorageService, @@ -17,7 +16,6 @@ export class DefaultActiveUserStateProvider implements ActiveUserStateProvider { constructor( protected accountService: AccountService, - protected encryptService: EncryptService, protected memoryStorage: AbstractMemoryStorageService & ObservableStorageService, protected diskStorage: AbstractStorageService & ObservableStorageService, ) {} @@ -40,7 +38,6 @@ export class DefaultActiveUserStateProvider implements ActiveUserStateProvider { return new DefaultActiveUserState( keyDefinition, this.accountService, - this.encryptService, this.getLocation(keyDefinition.stateDefinition.storageLocation), ); } diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts index 64c1d1b233..8a1143e29f 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts @@ -2,7 +2,7 @@ * need to update test environment so trackEmissions works appropriately * @jest-environment ../shared/test.environment.ts */ -import { any, anySymbol, mock } from "jest-mock-extended"; +import { any, mock } from "jest-mock-extended"; import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs"; import { Jsonify } from "type-fest"; @@ -49,12 +49,7 @@ describe("DefaultActiveUserState", () => { accountService.activeAccount$ = activeAccountSubject; diskStorageService = new FakeStorageService(); - userState = new DefaultActiveUserState( - testKeyDefinition, - accountService, - null, // Not testing anything with encrypt service - diskStorageService, - ); + userState = new DefaultActiveUserState(testKeyDefinition, accountService, diskStorageService); }); const makeUserId = (id: string) => { @@ -81,11 +76,11 @@ describe("DefaultActiveUserState", () => { const user2 = "user_00000000-0000-1000-a000-000000000002_fake_fake"; const state1 = { date: new Date(2021, 0), - array: ["value1"], + array: ["user1"], }; const state2 = { date: new Date(2022, 0), - array: ["value2"], + array: ["user2"], }; const initialState: Record = {}; initialState[user1] = state1; @@ -96,12 +91,11 @@ describe("DefaultActiveUserState", () => { // User signs in await changeActiveUser("1"); - await awaitAsync(); // Service does an update const updatedState = { date: new Date(2023, 0), - array: ["value3"], + array: ["user1-update"], }; await userState.update(() => updatedState); await awaitAsync(); @@ -109,6 +103,9 @@ describe("DefaultActiveUserState", () => { // Emulate an account switch await changeActiveUser("2"); + // #1 initial state from user1 + // #2 updated state for user1 + // #3 switched state to initial state for user2 expect(emissions).toEqual([state1, updatedState, state2]); // Should be called three time to get state, once for each user and once for the update @@ -352,6 +349,35 @@ describe("DefaultActiveUserState", () => { newData, ]); }); + + it("should throw on an attempted update when there is no active user", async () => { + await changeActiveUser(undefined); + + await expect(async () => await userState.update(() => null)).rejects.toThrow( + "No active user at this time.", + ); + }); + + it("should throw on an attempted update where there is no active user even if there used to be one", async () => { + // Arrange + diskStorageService.internalUpdateStore({ + "user_00000000-0000-1000-a000-000000000001_fake_fake": { + date: new Date(2019, 1), + array: [], + }, + }); + + const [userId, state] = await firstValueFrom(userState.combinedState$); + expect(userId).toBe("00000000-0000-1000-a000-000000000001"); + expect(state.date.getUTCFullYear()).toBe(2019); + + await changeActiveUser(undefined); + // Act + + expect(async () => await userState.update(() => null)).rejects.toThrow( + "No active user at this time.", + ); + }); }); describe("update races", () => { @@ -460,7 +486,7 @@ describe("DefaultActiveUserState", () => { }); test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { - expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(0); const val = await userState.update((state) => { return newData; }); @@ -554,14 +580,9 @@ describe("DefaultActiveUserState", () => { userKey = userKeyBuilder(userId, testKeyDefinition); }); - async function assertClean() { - const emissions = trackEmissions(userState["stateSubject"]); - const initial = structuredClone(emissions); - - diskStorageService.save(userKey, newData); - await awaitAsync(); // storage updates are behind a promise - - expect(emissions).toEqual(initial); // no longer listening to storage updates + function assertClean() { + expect(activeAccountSubject["observers"]).toHaveLength(0); + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(0); } it("should cleanup after last subscriber", async () => { @@ -569,11 +590,11 @@ describe("DefaultActiveUserState", () => { await awaitAsync(); // storage updates are behind a promise subscription.unsubscribe(); - expect(userState["subscriberCount"].getValue()).toBe(0); + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1); // Wait for cleanup await awaitAsync(cleanupDelayMs * 2); - await assertClean(); + assertClean(); }); it("should not cleanup if there are still subscribers", async () => { @@ -587,7 +608,7 @@ describe("DefaultActiveUserState", () => { // Wait for cleanup await awaitAsync(cleanupDelayMs * 2); - expect(userState["subscriberCount"].getValue()).toBe(1); + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1); // Still be listening to storage updates diskStorageService.save(userKey, newData); @@ -598,7 +619,7 @@ describe("DefaultActiveUserState", () => { // Wait for cleanup await awaitAsync(cleanupDelayMs * 2); - await assertClean(); + assertClean(); }); it("can re-initialize after cleanup", async () => { @@ -612,7 +633,7 @@ describe("DefaultActiveUserState", () => { const emissions = trackEmissions(userState.state$); await awaitAsync(); - diskStorageService.save(userKey, newData); + await diskStorageService.save(userKey, newData); await awaitAsync(); expect(emissions).toEqual([null, newData]); @@ -626,12 +647,16 @@ describe("DefaultActiveUserState", () => { await awaitAsync(); subscription.unsubscribe(); - expect(userState["subscriberCount"].getValue()).toBe(0); // Do not wait long enough for cleanup await awaitAsync(cleanupDelayMs / 2); - expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared - expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates + const state = await firstValueFrom(userState.state$); + + expect(state).toEqual(newData); // digging in to check that it hasn't been cleared + + // Should be called once for the initial subscription and once from the save + // but should NOT be called for the second subscription from the `firstValueFrom` + expect(diskStorageService.mock.get).toHaveBeenCalledTimes(2); }); it("state$ observables are durable to cleanup", async () => { diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.ts b/libs/common/src/platform/state/implementations/default-active-user-state.ts index 51688be338..595717d3f2 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.ts @@ -1,69 +1,120 @@ import { Observable, - BehaviorSubject, map, - shareReplay, switchMap, firstValueFrom, - combineLatestWith, filter, timeout, - Subscription, + merge, + share, + ReplaySubject, + timer, tap, + throwError, + distinctUntilChanged, + withLatestFrom, } from "rxjs"; import { AccountService } from "../../../auth/abstractions/account.service"; -import { EncryptService } from "../../abstractions/encrypt.service"; +import { UserId } from "../../../types/guid"; import { AbstractStorageService, ObservableStorageService, } from "../../abstractions/storage.service"; import { KeyDefinition, userKeyBuilder } from "../key-definition"; import { StateUpdateOptions, populateOptionsWithDefault } from "../state-update-options"; -import { ActiveUserState, activeMarker } from "../user-state"; +import { ActiveUserState, CombinedState, activeMarker } from "../user-state"; import { getStoredValue } from "./util"; -const FAKE_DEFAULT = Symbol("fakeDefault"); +const FAKE = Symbol("fake"); export class DefaultActiveUserState implements ActiveUserState { [activeMarker]: true; - private formattedKey$: Observable; private updatePromise: Promise | null = null; - private storageUpdateSubscription: Subscription; - private activeAccountUpdateSubscription: Subscription; - private subscriberCount = new BehaviorSubject(0); - private stateObservable: Observable; - private reinitialize = false; - protected stateSubject: BehaviorSubject = new BehaviorSubject< - T | typeof FAKE_DEFAULT - >(FAKE_DEFAULT); - private stateSubject$ = this.stateSubject.asObservable(); + private activeUserId$: Observable; - get state$() { - this.stateObservable = this.stateObservable ?? this.initializeObservable(); - return this.stateObservable; - } + combinedState$: Observable>; + state$: Observable; constructor( protected keyDefinition: KeyDefinition, private accountService: AccountService, - private encryptService: EncryptService, private chosenStorageLocation: AbstractStorageService & ObservableStorageService, ) { - this.formattedKey$ = this.accountService.activeAccount$.pipe( - map((account) => - account != null && account.id != null - ? userKeyBuilder(account.id, this.keyDefinition) - : null, - ), - tap(() => { - // We have a new key, so we should forget about previous update promises - this.updatePromise = null; - }), - shareReplay({ bufferSize: 1, refCount: false }), + this.activeUserId$ = this.accountService.activeAccount$.pipe( + // We only care about the UserId but we do want to know about no user as well. + map((a) => a?.id), + // To avoid going to storage when we don't need to, only get updates when there is a true change. + distinctUntilChanged(), ); + + const userChangeAndInitial$ = this.activeUserId$.pipe( + // If the user has changed, we no longer need to lock an update call + // since that call will be for a user that is no longer active. + tap(() => (this.updatePromise = null)), + switchMap(async (userId) => { + // We've switched or started off with no active user. So, + // emit a fake value so that we can fill our share buffer. + if (userId == null) { + return FAKE; + } + + const fullKey = userKeyBuilder(userId, this.keyDefinition); + const data = await getStoredValue( + fullKey, + this.chosenStorageLocation, + this.keyDefinition.deserializer, + ); + return [userId, data] as CombinedState; + }), + ); + + const latestStorage$ = this.chosenStorageLocation.updates$.pipe( + // Use withLatestFrom so that we do NOT emit when activeUserId changes because that + // is taken care of above, but we do want to have the latest user id + // when we get a storage update so we can filter the full key + withLatestFrom( + this.activeUserId$.pipe( + // Null userId is already taken care of through the userChange observable above + filter((u) => u != null), + // Take the userId and build the fullKey that we can now create + map((userId) => [userId, userKeyBuilder(userId, this.keyDefinition)] as const), + ), + ), + // Filter to only storage updates that pertain to our key + filter(([storageUpdate, [_userId, fullKey]]) => storageUpdate.key === fullKey), + switchMap(async ([storageUpdate, [userId, fullKey]]) => { + // We can shortcut on updateType of "remove" + // and just emit null. + if (storageUpdate.updateType === "remove") { + return [userId, null] as CombinedState; + } + + return [ + userId, + await getStoredValue( + fullKey, + this.chosenStorageLocation, + this.keyDefinition.deserializer, + ), + ] as CombinedState; + }), + ); + + this.combinedState$ = merge(userChangeAndInitial$, latestStorage$).pipe( + share({ + connector: () => new ReplaySubject | typeof FAKE>(1), + resetOnRefCountZero: () => timer(this.keyDefinition.cleanupDelayMs), + }), + // Filter out FAKE AFTER the share so that we can fill the ReplaySubjects + // buffer with something and avoid emitting when there is no active user. + filter>((d) => d !== (FAKE as unknown)), + ); + + // State should just be combined state without the user id + this.state$ = this.combinedState$.pipe(map(([_userId, state]) => state)); } async update( @@ -83,18 +134,11 @@ export class DefaultActiveUserState implements ActiveUserState { } } - // TODO: this should be removed - async getFromState(): Promise { - const key = await this.createKey(); - return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); - } - private async internalUpdate( configureState: (state: T, dependency: TCombine) => T, options: StateUpdateOptions, ) { - const key = await this.createKey(); - const currentState = await this.getStateForUpdate(key); + const [key, currentState] = await this.getStateForUpdate(); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -109,110 +153,22 @@ export class DefaultActiveUserState implements ActiveUserState { return newState; } - private initializeObservable() { - this.storageUpdateSubscription = this.chosenStorageLocation.updates$ - .pipe( - combineLatestWith(this.formattedKey$), - filter(([update, key]) => key !== null && update.key === key), - switchMap(async ([update, key]) => { - if (update.updateType === "remove") { - return null; - } - return await this.getState(key); - }), - ) - .subscribe((v) => this.stateSubject.next(v)); - - this.activeAccountUpdateSubscription = this.formattedKey$ - .pipe( - switchMap(async (key) => { - if (key == null) { - return FAKE_DEFAULT; - } - return await this.getState(key); - }), - ) - .subscribe((v) => this.stateSubject.next(v)); - - this.subscriberCount.subscribe((count) => { - if (count === 0 && this.stateObservable != null) { - this.triggerCleanup(); - } - }); - - return new Observable((subscriber) => { - this.incrementSubscribers(); - - // reinitialize listeners after cleanup - if (this.reinitialize) { - this.reinitialize = false; - this.initializeObservable(); - } - - const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); - subscriber.unsubscribe = () => { - this.decrementSubscribers(); - prevUnsubscribe(); - }; - - return this.stateSubject - .pipe( - // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. - filter((i) => i !== FAKE_DEFAULT), - ) - .subscribe(subscriber); - }); - } - - protected async createKey(): Promise { - const formattedKey = await firstValueFrom(this.formattedKey$); - if (formattedKey == null) { - throw new Error("Cannot create a key while there is no active user."); - } - return formattedKey; - } - /** For use in update methods, does not wait for update to complete before yielding state. * The expectation is that that await is already done */ - protected async getStateForUpdate(key: string) { - const currentValue = this.stateSubject.getValue(); - return currentValue === FAKE_DEFAULT - ? await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer) - : currentValue; - } - - /** To be used in observables. Awaits updates to ensure they are complete */ - private async getState(key: string): Promise { - if (this.updatePromise != null) { - await this.updatePromise; - } - return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); + protected async getStateForUpdate() { + const [userId, data] = await firstValueFrom( + this.combinedState$.pipe( + timeout({ + first: 1000, + with: () => throwError(() => new Error("No active user at this time.")), + }), + ), + ); + return [userKeyBuilder(userId, this.keyDefinition), data] as const; } protected saveToStorage(key: string, data: T): Promise { return this.chosenStorageLocation.save(key, data); } - - private incrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value + 1); - } - - private decrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value - 1); - } - - private triggerCleanup() { - setTimeout(() => { - if (this.subscriberCount.value === 0) { - this.updatePromise = null; - this.storageUpdateSubscription?.unsubscribe(); - this.activeAccountUpdateSubscription?.unsubscribe(); - this.subscriberCount.complete(); - this.subscriberCount = new BehaviorSubject(0); - this.stateSubject.next(FAKE_DEFAULT); - this.reinitialize = true; - } - }, this.keyDefinition.cleanupDelayMs); - } } diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.provider.ts b/libs/common/src/platform/state/implementations/default-single-user-state.provider.ts index 1a1352145d..5b998ed6c8 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.provider.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.provider.ts @@ -1,5 +1,4 @@ import { UserId } from "../../../types/guid"; -import { EncryptService } from "../../abstractions/encrypt.service"; import { AbstractMemoryStorageService, AbstractStorageService, @@ -16,7 +15,6 @@ export class DefaultSingleUserStateProvider implements SingleUserStateProvider { private cache: Record> = {}; constructor( - protected encryptService: EncryptService, protected memoryStorage: AbstractMemoryStorageService & ObservableStorageService, protected diskStorage: AbstractStorageService & ObservableStorageService, ) {} @@ -42,7 +40,6 @@ export class DefaultSingleUserStateProvider implements SingleUserStateProvider { return new DefaultSingleUserState( userId, keyDefinition, - this.encryptService, this.getLocation(keyDefinition.stateDefinition.storageLocation), ); } diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts b/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts index 715b770b2a..0a8412742d 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts @@ -3,7 +3,6 @@ * @jest-environment ../shared/test.environment.ts */ -import { anySymbol } from "jest-mock-extended"; import { firstValueFrom, of } from "rxjs"; import { Jsonify } from "type-fest"; @@ -46,12 +45,7 @@ describe("DefaultSingleUserState", () => { beforeEach(() => { diskStorageService = new FakeStorageService(); - userState = new DefaultSingleUserState( - userId, - testKeyDefinition, - null, // Not testing anything with encrypt service - diskStorageService, - ); + userState = new DefaultSingleUserState(userId, testKeyDefinition, diskStorageService); }); afterEach(() => { @@ -74,7 +68,12 @@ describe("DefaultSingleUserState", () => { const emissions = trackEmissions(userState.state$); await diskStorageService.save("wrong_key", newData); - expect(emissions).toHaveLength(0); + // Give userState a chance to emit it's initial value + // as well as wrongly emit the different key. + await awaitAsync(); + + // Just the initial value + expect(emissions).toEqual([null]); }); it("should emit initial storage value on first subscribe", async () => { @@ -94,6 +93,50 @@ describe("DefaultSingleUserState", () => { }); }); + describe("combinedState$", () => { + it("should emit when storage updates", async () => { + const emissions = trackEmissions(userState.combinedState$); + await diskStorageService.save(userKey, newData); + await awaitAsync(); + + expect(emissions).toEqual([ + [userId, null], // Initial value + [userId, newData], + ]); + }); + + it("should not emit when update key does not match", async () => { + const emissions = trackEmissions(userState.combinedState$); + await diskStorageService.save("wrong_key", newData); + + // Give userState a chance to emit it's initial value + // as well as wrongly emit the different key. + await awaitAsync(); + + // Just the initial value + expect(emissions).toHaveLength(1); + }); + + it("should emit initial storage value on first subscribe", async () => { + const initialStorage: Record = {}; + initialStorage[userKey] = TestState.fromJSON({ + date: "2022-09-21T13:14:17.648Z", + }); + diskStorageService.internalUpdateStore(initialStorage); + + const combinedState = await firstValueFrom(userState.combinedState$); + expect(diskStorageService.mock.get).toHaveBeenCalledTimes(1); + expect(diskStorageService.mock.get).toHaveBeenCalledWith( + `user_${userId}_fake_fake`, + undefined, + ); + expect(combinedState).toBeTruthy(); + const [stateUserId, state] = combinedState; + expect(stateUserId).toBe(userId); + expect(state).toBe(initialStorage[userKey]); + }); + }); + describe("update", () => { it("should save on update", async () => { const result = await userState.update((state) => { @@ -309,7 +352,6 @@ describe("DefaultSingleUserState", () => { }); test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { - expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT const val = await userState.update((state) => { return newData; }); @@ -322,14 +364,8 @@ describe("DefaultSingleUserState", () => { }); describe("cleanup", () => { - async function assertClean() { - const emissions = trackEmissions(userState["stateSubject"]); - const initial = structuredClone(emissions); - - diskStorageService.save(userKey, newData); - await awaitAsync(); // storage updates are behind a promise - - expect(emissions).toEqual(initial); // no longer listening to storage updates + function assertClean() { + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(0); } it("should cleanup after last subscriber", async () => { @@ -337,11 +373,9 @@ describe("DefaultSingleUserState", () => { await awaitAsync(); // storage updates are behind a promise subscription.unsubscribe(); - expect(userState["subscriberCount"].getValue()).toBe(0); // Wait for cleanup await awaitAsync(cleanupDelayMs * 2); - - await assertClean(); + assertClean(); }); it("should not cleanup if there are still subscribers", async () => { @@ -355,7 +389,7 @@ describe("DefaultSingleUserState", () => { // Wait for cleanup await awaitAsync(cleanupDelayMs * 2); - expect(userState["subscriberCount"].getValue()).toBe(1); + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1); // Still be listening to storage updates diskStorageService.save(userKey, newData); @@ -366,7 +400,7 @@ describe("DefaultSingleUserState", () => { // Wait for cleanup await awaitAsync(cleanupDelayMs * 2); - await assertClean(); + assertClean(); }); it("can re-initialize after cleanup", async () => { @@ -394,12 +428,15 @@ describe("DefaultSingleUserState", () => { await awaitAsync(); subscription.unsubscribe(); - expect(userState["subscriberCount"].getValue()).toBe(0); // Do not wait long enough for cleanup await awaitAsync(cleanupDelayMs / 2); - expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared - expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates + const value = await firstValueFrom(userState.state$); + expect(value).toEqual(newData); + + // Should be called once for the initial subscription and a second time during the save + // but should not be called for a second subscription if the cleanup hasn't happened yet. + expect(diskStorageService.mock.get).toHaveBeenCalledTimes(2); }); it("state$ observables are durable to cleanup", async () => { diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.ts b/libs/common/src/platform/state/implementations/default-single-user-state.ts index 84940493e7..13541c6c4b 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.ts @@ -1,51 +1,69 @@ import { - BehaviorSubject, Observable, - Subscription, + ReplaySubject, + combineLatest, + defer, filter, firstValueFrom, + merge, + of, + share, switchMap, timeout, + timer, } from "rxjs"; import { UserId } from "../../../types/guid"; -import { EncryptService } from "../../abstractions/encrypt.service"; import { AbstractStorageService, ObservableStorageService, } from "../../abstractions/storage.service"; import { KeyDefinition, userKeyBuilder } from "../key-definition"; import { StateUpdateOptions, populateOptionsWithDefault } from "../state-update-options"; -import { SingleUserState } from "../user-state"; +import { CombinedState, SingleUserState } from "../user-state"; import { getStoredValue } from "./util"; -const FAKE_DEFAULT = Symbol("fakeDefault"); - export class DefaultSingleUserState implements SingleUserState { private storageKey: string; private updatePromise: Promise | null = null; - private storageUpdateSubscription: Subscription; - private subscriberCount = new BehaviorSubject(0); - private stateObservable: Observable; - private reinitialize = false; - protected stateSubject: BehaviorSubject = new BehaviorSubject< - T | typeof FAKE_DEFAULT - >(FAKE_DEFAULT); - - get state$() { - this.stateObservable = this.stateObservable ?? this.initializeObservable(); - return this.stateObservable; - } + state$: Observable; + combinedState$: Observable>; constructor( readonly userId: UserId, private keyDefinition: KeyDefinition, - private encryptService: EncryptService, private chosenLocation: AbstractStorageService & ObservableStorageService, ) { this.storageKey = userKeyBuilder(this.userId, this.keyDefinition); + const initialStorageGet$ = defer(() => { + return getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer); + }); + + const latestStorage$ = chosenLocation.updates$.pipe( + filter((s) => s.key === this.storageKey), + switchMap(async (storageUpdate) => { + if (storageUpdate.updateType === "remove") { + return null; + } + + return await getStoredValue( + this.storageKey, + this.chosenLocation, + this.keyDefinition.deserializer, + ); + }), + ); + + this.state$ = merge(initialStorageGet$, latestStorage$).pipe( + share({ + connector: () => new ReplaySubject(1), + resetOnRefCountZero: () => timer(this.keyDefinition.cleanupDelayMs), + }), + ); + + this.combinedState$ = combineLatest([of(userId), this.state$]); } async update( @@ -85,94 +103,10 @@ export class DefaultSingleUserState implements SingleUserState { return newState; } - private initializeObservable() { - this.storageUpdateSubscription = this.chosenLocation.updates$ - .pipe( - filter((update) => update.key === this.storageKey), - switchMap(async (update) => { - if (update.updateType === "remove") { - return null; - } - return await this.getFromState(); - }), - ) - .subscribe((v) => this.stateSubject.next(v)); - - this.subscriberCount.subscribe((count) => { - if (count === 0 && this.stateObservable != null) { - this.triggerCleanup(); - } - }); - - // Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to - // trigger populating it immediately. - this.getFromState().then((s) => { - this.stateSubject.next(s); - }); - - return new Observable((subscriber) => { - this.incrementSubscribers(); - - // reinitialize listeners after cleanup - if (this.reinitialize) { - this.reinitialize = false; - this.initializeObservable(); - } - - const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); - subscriber.unsubscribe = () => { - this.decrementSubscribers(); - prevUnsubscribe(); - }; - - return this.stateSubject - .pipe( - // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. - filter((i) => i != FAKE_DEFAULT), - ) - .subscribe(subscriber); - }); - } - /** For use in update methods, does not wait for update to complete before yielding state. * The expectation is that that await is already done */ private async getStateForUpdate() { - const currentValue = this.stateSubject.getValue(); - return currentValue === FAKE_DEFAULT - ? await getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer) - : currentValue; - } - - async getFromState(): Promise { - if (this.updatePromise != null) { - return await this.updatePromise; - } - return await getStoredValue( - this.storageKey, - this.chosenLocation, - this.keyDefinition.deserializer, - ); - } - - private incrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value + 1); - } - - private decrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value - 1); - } - - private triggerCleanup() { - setTimeout(() => { - if (this.subscriberCount.value === 0) { - this.updatePromise = null; - this.storageUpdateSubscription.unsubscribe(); - this.subscriberCount.complete(); - this.subscriberCount = new BehaviorSubject(0); - this.stateSubject.next(FAKE_DEFAULT); - this.reinitialize = true; - } - }, this.keyDefinition.cleanupDelayMs); + return await firstValueFrom(this.state$); } } diff --git a/libs/common/src/platform/state/user-state.ts b/libs/common/src/platform/state/user-state.ts index 7b5ab8a2fd..595d1fcf02 100644 --- a/libs/common/src/platform/state/user-state.ts +++ b/libs/common/src/platform/state/user-state.ts @@ -4,12 +4,22 @@ import { UserId } from "../../types/guid"; import { StateUpdateOptions } from "./state-update-options"; +export type CombinedState = readonly [userId: UserId, state: T]; + /** * A helper object for interacting with state that is scoped to a specific user. */ export interface UserState { + /** + * Emits a stream of data. + */ readonly state$: Observable; - readonly getFromState: () => Promise; + + /** + * Emits a stream of data alongside the user id the data corresponds to. + */ + readonly combinedState$: Observable>; + /** * Updates backing stores for the active user. * @param configureState function that takes the current state and returns the new state