mirror of
https://github.com/bitwarden/browser.git
synced 2024-12-22 16:29:09 +01:00
Combined State (#7383)
* Introduce Combined State * Cleanup Test * Update Fakes * Address PR Feedback * Update libs/common/src/platform/state/implementations/default-active-user-state.ts Co-authored-by: Matt Gibson <mgibson@bitwarden.com> * Prettier * Get rid of ReplaySubject reference --------- Co-authored-by: Matt Gibson <mgibson@bitwarden.com>
This commit is contained in:
parent
312197b8c7
commit
5e11cb212d
@ -321,7 +321,6 @@ export default class MainBackground {
|
|||||||
: new EncryptServiceImplementation(this.cryptoFunctionService, this.logService, true);
|
: new EncryptServiceImplementation(this.cryptoFunctionService, this.logService, true);
|
||||||
|
|
||||||
this.singleUserStateProvider = new DefaultSingleUserStateProvider(
|
this.singleUserStateProvider = new DefaultSingleUserStateProvider(
|
||||||
this.encryptService,
|
|
||||||
this.memoryStorageService as BackgroundMemoryStorageService,
|
this.memoryStorageService as BackgroundMemoryStorageService,
|
||||||
this.storageService as BrowserLocalStorageService,
|
this.storageService as BrowserLocalStorageService,
|
||||||
);
|
);
|
||||||
@ -332,7 +331,6 @@ export default class MainBackground {
|
|||||||
);
|
);
|
||||||
this.activeUserStateProvider = new DefaultActiveUserStateProvider(
|
this.activeUserStateProvider = new DefaultActiveUserStateProvider(
|
||||||
this.accountService,
|
this.accountService,
|
||||||
this.encryptService,
|
|
||||||
this.memoryStorageService as BackgroundMemoryStorageService,
|
this.memoryStorageService as BackgroundMemoryStorageService,
|
||||||
this.storageService as BrowserLocalStorageService,
|
this.storageService as BrowserLocalStorageService,
|
||||||
);
|
);
|
||||||
|
@ -7,7 +7,6 @@ import {
|
|||||||
accountServiceFactory,
|
accountServiceFactory,
|
||||||
} from "../../../auth/background/service-factories/account-service.factory";
|
} from "../../../auth/background/service-factories/account-service.factory";
|
||||||
|
|
||||||
import { EncryptServiceInitOptions, encryptServiceFactory } from "./encrypt-service.factory";
|
|
||||||
import { CachedServices, FactoryOptions, factory } from "./factory-options";
|
import { CachedServices, FactoryOptions, factory } from "./factory-options";
|
||||||
import {
|
import {
|
||||||
DiskStorageServiceInitOptions,
|
DiskStorageServiceInitOptions,
|
||||||
@ -20,7 +19,6 @@ type ActiveUserStateProviderFactory = FactoryOptions;
|
|||||||
|
|
||||||
export type ActiveUserStateProviderInitOptions = ActiveUserStateProviderFactory &
|
export type ActiveUserStateProviderInitOptions = ActiveUserStateProviderFactory &
|
||||||
AccountServiceInitOptions &
|
AccountServiceInitOptions &
|
||||||
EncryptServiceInitOptions &
|
|
||||||
MemoryStorageServiceInitOptions &
|
MemoryStorageServiceInitOptions &
|
||||||
DiskStorageServiceInitOptions;
|
DiskStorageServiceInitOptions;
|
||||||
|
|
||||||
@ -35,7 +33,6 @@ export async function activeUserStateProviderFactory(
|
|||||||
async () =>
|
async () =>
|
||||||
new DefaultActiveUserStateProvider(
|
new DefaultActiveUserStateProvider(
|
||||||
await accountServiceFactory(cache, opts),
|
await accountServiceFactory(cache, opts),
|
||||||
await encryptServiceFactory(cache, opts),
|
|
||||||
await observableMemoryStorageServiceFactory(cache, opts),
|
await observableMemoryStorageServiceFactory(cache, opts),
|
||||||
await observableDiskStorageServiceFactory(cache, opts),
|
await observableDiskStorageServiceFactory(cache, opts),
|
||||||
),
|
),
|
||||||
|
@ -2,7 +2,6 @@ import { SingleUserStateProvider } from "@bitwarden/common/platform/state";
|
|||||||
// eslint-disable-next-line import/no-restricted-paths -- We need the implementation to inject, but generally this should not be accessed
|
// eslint-disable-next-line import/no-restricted-paths -- We need the implementation to inject, but generally this should not be accessed
|
||||||
import { DefaultSingleUserStateProvider } from "@bitwarden/common/platform/state/implementations/default-single-user-state.provider";
|
import { DefaultSingleUserStateProvider } from "@bitwarden/common/platform/state/implementations/default-single-user-state.provider";
|
||||||
|
|
||||||
import { EncryptServiceInitOptions, encryptServiceFactory } from "./encrypt-service.factory";
|
|
||||||
import { CachedServices, FactoryOptions, factory } from "./factory-options";
|
import { CachedServices, FactoryOptions, factory } from "./factory-options";
|
||||||
import {
|
import {
|
||||||
DiskStorageServiceInitOptions,
|
DiskStorageServiceInitOptions,
|
||||||
@ -14,7 +13,6 @@ import {
|
|||||||
type SingleUserStateProviderFactoryOptions = FactoryOptions;
|
type SingleUserStateProviderFactoryOptions = FactoryOptions;
|
||||||
|
|
||||||
export type SingleUserStateProviderInitOptions = SingleUserStateProviderFactoryOptions &
|
export type SingleUserStateProviderInitOptions = SingleUserStateProviderFactoryOptions &
|
||||||
EncryptServiceInitOptions &
|
|
||||||
MemoryStorageServiceInitOptions &
|
MemoryStorageServiceInitOptions &
|
||||||
DiskStorageServiceInitOptions;
|
DiskStorageServiceInitOptions;
|
||||||
|
|
||||||
@ -28,7 +26,6 @@ export async function singleUserStateProviderFactory(
|
|||||||
opts,
|
opts,
|
||||||
async () =>
|
async () =>
|
||||||
new DefaultSingleUserStateProvider(
|
new DefaultSingleUserStateProvider(
|
||||||
await encryptServiceFactory(cache, opts),
|
|
||||||
await observableMemoryStorageServiceFactory(cache, opts),
|
await observableMemoryStorageServiceFactory(cache, opts),
|
||||||
await observableDiskStorageServiceFactory(cache, opts),
|
await observableDiskStorageServiceFactory(cache, opts),
|
||||||
),
|
),
|
||||||
|
@ -226,7 +226,6 @@ export class Main {
|
|||||||
);
|
);
|
||||||
|
|
||||||
this.singleUserStateProvider = new DefaultSingleUserStateProvider(
|
this.singleUserStateProvider = new DefaultSingleUserStateProvider(
|
||||||
this.encryptService,
|
|
||||||
this.memoryStorageService,
|
this.memoryStorageService,
|
||||||
this.storageService,
|
this.storageService,
|
||||||
);
|
);
|
||||||
@ -241,7 +240,6 @@ export class Main {
|
|||||||
|
|
||||||
this.activeUserStateProvider = new DefaultActiveUserStateProvider(
|
this.activeUserStateProvider = new DefaultActiveUserStateProvider(
|
||||||
this.accountService,
|
this.accountService,
|
||||||
this.encryptService,
|
|
||||||
this.memoryStorageService,
|
this.memoryStorageService,
|
||||||
this.storageService,
|
this.storageService,
|
||||||
);
|
);
|
||||||
|
@ -800,17 +800,12 @@ import { ModalService } from "./modal.service";
|
|||||||
{
|
{
|
||||||
provide: ActiveUserStateProvider,
|
provide: ActiveUserStateProvider,
|
||||||
useClass: DefaultActiveUserStateProvider,
|
useClass: DefaultActiveUserStateProvider,
|
||||||
deps: [
|
deps: [AccountServiceAbstraction, OBSERVABLE_MEMORY_STORAGE, OBSERVABLE_DISK_STORAGE],
|
||||||
AccountServiceAbstraction,
|
|
||||||
EncryptService,
|
|
||||||
OBSERVABLE_MEMORY_STORAGE,
|
|
||||||
OBSERVABLE_DISK_STORAGE,
|
|
||||||
],
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
provide: SingleUserStateProvider,
|
provide: SingleUserStateProvider,
|
||||||
useClass: DefaultSingleUserStateProvider,
|
useClass: DefaultSingleUserStateProvider,
|
||||||
deps: [EncryptService, OBSERVABLE_MEMORY_STORAGE, OBSERVABLE_DISK_STORAGE],
|
deps: [OBSERVABLE_MEMORY_STORAGE, OBSERVABLE_DISK_STORAGE],
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
provide: DerivedStateProvider,
|
provide: DerivedStateProvider,
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
import { ReplaySubject, firstValueFrom, timeout } from "rxjs";
|
import { Observable, ReplaySubject, firstValueFrom, map, timeout } from "rxjs";
|
||||||
|
|
||||||
import { DerivedState, GlobalState, SingleUserState, ActiveUserState } from "../src/platform/state";
|
import { DerivedState, GlobalState, SingleUserState, ActiveUserState } from "../src/platform/state";
|
||||||
// eslint-disable-next-line import/no-restricted-paths -- using unexposed options for clean typing in test class
|
// eslint-disable-next-line import/no-restricted-paths -- using unexposed options for clean typing in test class
|
||||||
import { StateUpdateOptions } from "../src/platform/state/state-update-options";
|
import { StateUpdateOptions } from "../src/platform/state/state-update-options";
|
||||||
// eslint-disable-next-line import/no-restricted-paths -- using unexposed options for clean typing in test class
|
// eslint-disable-next-line import/no-restricted-paths -- using unexposed options for clean typing in test class
|
||||||
import { UserState, activeMarker } from "../src/platform/state/user-state";
|
import { CombinedState, UserState, activeMarker } from "../src/platform/state/user-state";
|
||||||
import { UserId } from "../src/types/guid";
|
import { UserId } from "../src/types/guid";
|
||||||
|
|
||||||
const DEFAULT_TEST_OPTIONS: StateUpdateOptions<any, any> = {
|
const DEFAULT_TEST_OPTIONS: StateUpdateOptions<any, any> = {
|
||||||
@ -57,9 +57,19 @@ export class FakeGlobalState<T> implements GlobalState<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export class FakeUserState<T> implements UserState<T> {
|
abstract class FakeUserState<T> implements UserState<T> {
|
||||||
// eslint-disable-next-line rxjs/no-exposed-subjects -- exposed for testing setup
|
// eslint-disable-next-line rxjs/no-exposed-subjects -- exposed for testing setup
|
||||||
stateSubject = new ReplaySubject<T>(1);
|
stateSubject = new ReplaySubject<CombinedState<T>>(1);
|
||||||
|
|
||||||
|
protected userId: UserId;
|
||||||
|
|
||||||
|
state$: Observable<T>;
|
||||||
|
combinedState$: Observable<CombinedState<T>>;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.combinedState$ = this.stateSubject.asObservable();
|
||||||
|
this.state$ = this.combinedState$.pipe(map(([_userId, state]) => state));
|
||||||
|
}
|
||||||
|
|
||||||
update: <TCombine>(
|
update: <TCombine>(
|
||||||
configureState: (state: T, dependency: TCombine) => T,
|
configureState: (state: T, dependency: TCombine) => T,
|
||||||
@ -75,34 +85,24 @@ export class FakeUserState<T> implements UserState<T> {
|
|||||||
return current;
|
return current;
|
||||||
}
|
}
|
||||||
const newState = configureState(current, combinedDependencies);
|
const newState = configureState(current, combinedDependencies);
|
||||||
this.stateSubject.next(newState);
|
this.stateSubject.next([this.userId, newState]);
|
||||||
return newState;
|
return newState;
|
||||||
});
|
});
|
||||||
|
|
||||||
updateMock = this.update as jest.MockedFunction<typeof this.update>;
|
updateMock = this.update as jest.MockedFunction<typeof this.update>;
|
||||||
|
|
||||||
updateFor: <TCombine>(
|
|
||||||
userId: UserId,
|
|
||||||
configureState: (state: T, dependency: TCombine) => T,
|
|
||||||
options?: StateUpdateOptions<T, TCombine>,
|
|
||||||
) => Promise<T> = jest.fn();
|
|
||||||
|
|
||||||
getFromState: () => Promise<T> = jest.fn(async () => {
|
|
||||||
return await firstValueFrom(this.state$.pipe(timeout(10)));
|
|
||||||
});
|
|
||||||
|
|
||||||
get state$() {
|
|
||||||
return this.stateSubject.asObservable();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export class FakeSingleUserState<T> extends FakeUserState<T> implements SingleUserState<T> {
|
export class FakeSingleUserState<T> extends FakeUserState<T> implements SingleUserState<T> {
|
||||||
constructor(readonly userId: UserId) {
|
constructor(readonly userId: UserId) {
|
||||||
super();
|
super();
|
||||||
|
this.userId = userId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
export class FakeActiveUserState<T> extends FakeUserState<T> implements ActiveUserState<T> {
|
export class FakeActiveUserState<T> extends FakeUserState<T> implements ActiveUserState<T> {
|
||||||
[activeMarker]: true;
|
[activeMarker]: true;
|
||||||
|
changeActiveUser(userId: UserId) {
|
||||||
|
this.userId = userId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export class FakeDerivedState<T> implements DerivedState<T> {
|
export class FakeDerivedState<T> implements DerivedState<T> {
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
import { AccountService } from "../../../auth/abstractions/account.service";
|
import { AccountService } from "../../../auth/abstractions/account.service";
|
||||||
import { EncryptService } from "../../abstractions/encrypt.service";
|
|
||||||
import {
|
import {
|
||||||
AbstractMemoryStorageService,
|
AbstractMemoryStorageService,
|
||||||
AbstractStorageService,
|
AbstractStorageService,
|
||||||
@ -17,7 +16,6 @@ export class DefaultActiveUserStateProvider implements ActiveUserStateProvider {
|
|||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
protected accountService: AccountService,
|
protected accountService: AccountService,
|
||||||
protected encryptService: EncryptService,
|
|
||||||
protected memoryStorage: AbstractMemoryStorageService & ObservableStorageService,
|
protected memoryStorage: AbstractMemoryStorageService & ObservableStorageService,
|
||||||
protected diskStorage: AbstractStorageService & ObservableStorageService,
|
protected diskStorage: AbstractStorageService & ObservableStorageService,
|
||||||
) {}
|
) {}
|
||||||
@ -40,7 +38,6 @@ export class DefaultActiveUserStateProvider implements ActiveUserStateProvider {
|
|||||||
return new DefaultActiveUserState<T>(
|
return new DefaultActiveUserState<T>(
|
||||||
keyDefinition,
|
keyDefinition,
|
||||||
this.accountService,
|
this.accountService,
|
||||||
this.encryptService,
|
|
||||||
this.getLocation(keyDefinition.stateDefinition.storageLocation),
|
this.getLocation(keyDefinition.stateDefinition.storageLocation),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
* need to update test environment so trackEmissions works appropriately
|
* need to update test environment so trackEmissions works appropriately
|
||||||
* @jest-environment ../shared/test.environment.ts
|
* @jest-environment ../shared/test.environment.ts
|
||||||
*/
|
*/
|
||||||
import { any, anySymbol, mock } from "jest-mock-extended";
|
import { any, mock } from "jest-mock-extended";
|
||||||
import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs";
|
import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs";
|
||||||
import { Jsonify } from "type-fest";
|
import { Jsonify } from "type-fest";
|
||||||
|
|
||||||
@ -49,12 +49,7 @@ describe("DefaultActiveUserState", () => {
|
|||||||
accountService.activeAccount$ = activeAccountSubject;
|
accountService.activeAccount$ = activeAccountSubject;
|
||||||
|
|
||||||
diskStorageService = new FakeStorageService();
|
diskStorageService = new FakeStorageService();
|
||||||
userState = new DefaultActiveUserState(
|
userState = new DefaultActiveUserState(testKeyDefinition, accountService, diskStorageService);
|
||||||
testKeyDefinition,
|
|
||||||
accountService,
|
|
||||||
null, // Not testing anything with encrypt service
|
|
||||||
diskStorageService,
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const makeUserId = (id: string) => {
|
const makeUserId = (id: string) => {
|
||||||
@ -81,11 +76,11 @@ describe("DefaultActiveUserState", () => {
|
|||||||
const user2 = "user_00000000-0000-1000-a000-000000000002_fake_fake";
|
const user2 = "user_00000000-0000-1000-a000-000000000002_fake_fake";
|
||||||
const state1 = {
|
const state1 = {
|
||||||
date: new Date(2021, 0),
|
date: new Date(2021, 0),
|
||||||
array: ["value1"],
|
array: ["user1"],
|
||||||
};
|
};
|
||||||
const state2 = {
|
const state2 = {
|
||||||
date: new Date(2022, 0),
|
date: new Date(2022, 0),
|
||||||
array: ["value2"],
|
array: ["user2"],
|
||||||
};
|
};
|
||||||
const initialState: Record<string, TestState> = {};
|
const initialState: Record<string, TestState> = {};
|
||||||
initialState[user1] = state1;
|
initialState[user1] = state1;
|
||||||
@ -96,12 +91,11 @@ describe("DefaultActiveUserState", () => {
|
|||||||
|
|
||||||
// User signs in
|
// User signs in
|
||||||
await changeActiveUser("1");
|
await changeActiveUser("1");
|
||||||
await awaitAsync();
|
|
||||||
|
|
||||||
// Service does an update
|
// Service does an update
|
||||||
const updatedState = {
|
const updatedState = {
|
||||||
date: new Date(2023, 0),
|
date: new Date(2023, 0),
|
||||||
array: ["value3"],
|
array: ["user1-update"],
|
||||||
};
|
};
|
||||||
await userState.update(() => updatedState);
|
await userState.update(() => updatedState);
|
||||||
await awaitAsync();
|
await awaitAsync();
|
||||||
@ -109,6 +103,9 @@ describe("DefaultActiveUserState", () => {
|
|||||||
// Emulate an account switch
|
// Emulate an account switch
|
||||||
await changeActiveUser("2");
|
await changeActiveUser("2");
|
||||||
|
|
||||||
|
// #1 initial state from user1
|
||||||
|
// #2 updated state for user1
|
||||||
|
// #3 switched state to initial state for user2
|
||||||
expect(emissions).toEqual([state1, updatedState, state2]);
|
expect(emissions).toEqual([state1, updatedState, state2]);
|
||||||
|
|
||||||
// Should be called three time to get state, once for each user and once for the update
|
// Should be called three time to get state, once for each user and once for the update
|
||||||
@ -352,6 +349,35 @@ describe("DefaultActiveUserState", () => {
|
|||||||
newData,
|
newData,
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("should throw on an attempted update when there is no active user", async () => {
|
||||||
|
await changeActiveUser(undefined);
|
||||||
|
|
||||||
|
await expect(async () => await userState.update(() => null)).rejects.toThrow(
|
||||||
|
"No active user at this time.",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should throw on an attempted update where there is no active user even if there used to be one", async () => {
|
||||||
|
// Arrange
|
||||||
|
diskStorageService.internalUpdateStore({
|
||||||
|
"user_00000000-0000-1000-a000-000000000001_fake_fake": {
|
||||||
|
date: new Date(2019, 1),
|
||||||
|
array: [],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const [userId, state] = await firstValueFrom(userState.combinedState$);
|
||||||
|
expect(userId).toBe("00000000-0000-1000-a000-000000000001");
|
||||||
|
expect(state.date.getUTCFullYear()).toBe(2019);
|
||||||
|
|
||||||
|
await changeActiveUser(undefined);
|
||||||
|
// Act
|
||||||
|
|
||||||
|
expect(async () => await userState.update(() => null)).rejects.toThrow(
|
||||||
|
"No active user at this time.",
|
||||||
|
);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("update races", () => {
|
describe("update races", () => {
|
||||||
@ -460,7 +486,7 @@ describe("DefaultActiveUserState", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
|
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
|
||||||
expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
|
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(0);
|
||||||
const val = await userState.update((state) => {
|
const val = await userState.update((state) => {
|
||||||
return newData;
|
return newData;
|
||||||
});
|
});
|
||||||
@ -554,14 +580,9 @@ describe("DefaultActiveUserState", () => {
|
|||||||
userKey = userKeyBuilder(userId, testKeyDefinition);
|
userKey = userKeyBuilder(userId, testKeyDefinition);
|
||||||
});
|
});
|
||||||
|
|
||||||
async function assertClean() {
|
function assertClean() {
|
||||||
const emissions = trackEmissions(userState["stateSubject"]);
|
expect(activeAccountSubject["observers"]).toHaveLength(0);
|
||||||
const initial = structuredClone(emissions);
|
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(0);
|
||||||
|
|
||||||
diskStorageService.save(userKey, newData);
|
|
||||||
await awaitAsync(); // storage updates are behind a promise
|
|
||||||
|
|
||||||
expect(emissions).toEqual(initial); // no longer listening to storage updates
|
|
||||||
}
|
}
|
||||||
|
|
||||||
it("should cleanup after last subscriber", async () => {
|
it("should cleanup after last subscriber", async () => {
|
||||||
@ -569,11 +590,11 @@ describe("DefaultActiveUserState", () => {
|
|||||||
await awaitAsync(); // storage updates are behind a promise
|
await awaitAsync(); // storage updates are behind a promise
|
||||||
|
|
||||||
subscription.unsubscribe();
|
subscription.unsubscribe();
|
||||||
expect(userState["subscriberCount"].getValue()).toBe(0);
|
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1);
|
||||||
// Wait for cleanup
|
// Wait for cleanup
|
||||||
await awaitAsync(cleanupDelayMs * 2);
|
await awaitAsync(cleanupDelayMs * 2);
|
||||||
|
|
||||||
await assertClean();
|
assertClean();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should not cleanup if there are still subscribers", async () => {
|
it("should not cleanup if there are still subscribers", async () => {
|
||||||
@ -587,7 +608,7 @@ describe("DefaultActiveUserState", () => {
|
|||||||
// Wait for cleanup
|
// Wait for cleanup
|
||||||
await awaitAsync(cleanupDelayMs * 2);
|
await awaitAsync(cleanupDelayMs * 2);
|
||||||
|
|
||||||
expect(userState["subscriberCount"].getValue()).toBe(1);
|
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1);
|
||||||
|
|
||||||
// Still be listening to storage updates
|
// Still be listening to storage updates
|
||||||
diskStorageService.save(userKey, newData);
|
diskStorageService.save(userKey, newData);
|
||||||
@ -598,7 +619,7 @@ describe("DefaultActiveUserState", () => {
|
|||||||
// Wait for cleanup
|
// Wait for cleanup
|
||||||
await awaitAsync(cleanupDelayMs * 2);
|
await awaitAsync(cleanupDelayMs * 2);
|
||||||
|
|
||||||
await assertClean();
|
assertClean();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("can re-initialize after cleanup", async () => {
|
it("can re-initialize after cleanup", async () => {
|
||||||
@ -612,7 +633,7 @@ describe("DefaultActiveUserState", () => {
|
|||||||
const emissions = trackEmissions(userState.state$);
|
const emissions = trackEmissions(userState.state$);
|
||||||
await awaitAsync();
|
await awaitAsync();
|
||||||
|
|
||||||
diskStorageService.save(userKey, newData);
|
await diskStorageService.save(userKey, newData);
|
||||||
await awaitAsync();
|
await awaitAsync();
|
||||||
|
|
||||||
expect(emissions).toEqual([null, newData]);
|
expect(emissions).toEqual([null, newData]);
|
||||||
@ -626,12 +647,16 @@ describe("DefaultActiveUserState", () => {
|
|||||||
await awaitAsync();
|
await awaitAsync();
|
||||||
|
|
||||||
subscription.unsubscribe();
|
subscription.unsubscribe();
|
||||||
expect(userState["subscriberCount"].getValue()).toBe(0);
|
|
||||||
// Do not wait long enough for cleanup
|
// Do not wait long enough for cleanup
|
||||||
await awaitAsync(cleanupDelayMs / 2);
|
await awaitAsync(cleanupDelayMs / 2);
|
||||||
|
|
||||||
expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
|
const state = await firstValueFrom(userState.state$);
|
||||||
expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
|
|
||||||
|
expect(state).toEqual(newData); // digging in to check that it hasn't been cleared
|
||||||
|
|
||||||
|
// Should be called once for the initial subscription and once from the save
|
||||||
|
// but should NOT be called for the second subscription from the `firstValueFrom`
|
||||||
|
expect(diskStorageService.mock.get).toHaveBeenCalledTimes(2);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("state$ observables are durable to cleanup", async () => {
|
it("state$ observables are durable to cleanup", async () => {
|
||||||
|
@ -1,69 +1,120 @@
|
|||||||
import {
|
import {
|
||||||
Observable,
|
Observable,
|
||||||
BehaviorSubject,
|
|
||||||
map,
|
map,
|
||||||
shareReplay,
|
|
||||||
switchMap,
|
switchMap,
|
||||||
firstValueFrom,
|
firstValueFrom,
|
||||||
combineLatestWith,
|
|
||||||
filter,
|
filter,
|
||||||
timeout,
|
timeout,
|
||||||
Subscription,
|
merge,
|
||||||
|
share,
|
||||||
|
ReplaySubject,
|
||||||
|
timer,
|
||||||
tap,
|
tap,
|
||||||
|
throwError,
|
||||||
|
distinctUntilChanged,
|
||||||
|
withLatestFrom,
|
||||||
} from "rxjs";
|
} from "rxjs";
|
||||||
|
|
||||||
import { AccountService } from "../../../auth/abstractions/account.service";
|
import { AccountService } from "../../../auth/abstractions/account.service";
|
||||||
import { EncryptService } from "../../abstractions/encrypt.service";
|
import { UserId } from "../../../types/guid";
|
||||||
import {
|
import {
|
||||||
AbstractStorageService,
|
AbstractStorageService,
|
||||||
ObservableStorageService,
|
ObservableStorageService,
|
||||||
} from "../../abstractions/storage.service";
|
} from "../../abstractions/storage.service";
|
||||||
import { KeyDefinition, userKeyBuilder } from "../key-definition";
|
import { KeyDefinition, userKeyBuilder } from "../key-definition";
|
||||||
import { StateUpdateOptions, populateOptionsWithDefault } from "../state-update-options";
|
import { StateUpdateOptions, populateOptionsWithDefault } from "../state-update-options";
|
||||||
import { ActiveUserState, activeMarker } from "../user-state";
|
import { ActiveUserState, CombinedState, activeMarker } from "../user-state";
|
||||||
|
|
||||||
import { getStoredValue } from "./util";
|
import { getStoredValue } from "./util";
|
||||||
|
|
||||||
const FAKE_DEFAULT = Symbol("fakeDefault");
|
const FAKE = Symbol("fake");
|
||||||
|
|
||||||
export class DefaultActiveUserState<T> implements ActiveUserState<T> {
|
export class DefaultActiveUserState<T> implements ActiveUserState<T> {
|
||||||
[activeMarker]: true;
|
[activeMarker]: true;
|
||||||
private formattedKey$: Observable<string>;
|
|
||||||
private updatePromise: Promise<T> | null = null;
|
private updatePromise: Promise<T> | null = null;
|
||||||
private storageUpdateSubscription: Subscription;
|
|
||||||
private activeAccountUpdateSubscription: Subscription;
|
|
||||||
private subscriberCount = new BehaviorSubject<number>(0);
|
|
||||||
private stateObservable: Observable<T>;
|
|
||||||
private reinitialize = false;
|
|
||||||
|
|
||||||
protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
|
private activeUserId$: Observable<UserId | null>;
|
||||||
T | typeof FAKE_DEFAULT
|
|
||||||
>(FAKE_DEFAULT);
|
|
||||||
private stateSubject$ = this.stateSubject.asObservable();
|
|
||||||
|
|
||||||
get state$() {
|
combinedState$: Observable<CombinedState<T>>;
|
||||||
this.stateObservable = this.stateObservable ?? this.initializeObservable();
|
state$: Observable<T>;
|
||||||
return this.stateObservable;
|
|
||||||
}
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
protected keyDefinition: KeyDefinition<T>,
|
protected keyDefinition: KeyDefinition<T>,
|
||||||
private accountService: AccountService,
|
private accountService: AccountService,
|
||||||
private encryptService: EncryptService,
|
|
||||||
private chosenStorageLocation: AbstractStorageService & ObservableStorageService,
|
private chosenStorageLocation: AbstractStorageService & ObservableStorageService,
|
||||||
) {
|
) {
|
||||||
this.formattedKey$ = this.accountService.activeAccount$.pipe(
|
this.activeUserId$ = this.accountService.activeAccount$.pipe(
|
||||||
map((account) =>
|
// We only care about the UserId but we do want to know about no user as well.
|
||||||
account != null && account.id != null
|
map((a) => a?.id),
|
||||||
? userKeyBuilder(account.id, this.keyDefinition)
|
// To avoid going to storage when we don't need to, only get updates when there is a true change.
|
||||||
: null,
|
distinctUntilChanged(),
|
||||||
),
|
|
||||||
tap(() => {
|
|
||||||
// We have a new key, so we should forget about previous update promises
|
|
||||||
this.updatePromise = null;
|
|
||||||
}),
|
|
||||||
shareReplay({ bufferSize: 1, refCount: false }),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const userChangeAndInitial$ = this.activeUserId$.pipe(
|
||||||
|
// If the user has changed, we no longer need to lock an update call
|
||||||
|
// since that call will be for a user that is no longer active.
|
||||||
|
tap(() => (this.updatePromise = null)),
|
||||||
|
switchMap(async (userId) => {
|
||||||
|
// We've switched or started off with no active user. So,
|
||||||
|
// emit a fake value so that we can fill our share buffer.
|
||||||
|
if (userId == null) {
|
||||||
|
return FAKE;
|
||||||
|
}
|
||||||
|
|
||||||
|
const fullKey = userKeyBuilder(userId, this.keyDefinition);
|
||||||
|
const data = await getStoredValue(
|
||||||
|
fullKey,
|
||||||
|
this.chosenStorageLocation,
|
||||||
|
this.keyDefinition.deserializer,
|
||||||
|
);
|
||||||
|
return [userId, data] as CombinedState<T>;
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const latestStorage$ = this.chosenStorageLocation.updates$.pipe(
|
||||||
|
// Use withLatestFrom so that we do NOT emit when activeUserId changes because that
|
||||||
|
// is taken care of above, but we do want to have the latest user id
|
||||||
|
// when we get a storage update so we can filter the full key
|
||||||
|
withLatestFrom(
|
||||||
|
this.activeUserId$.pipe(
|
||||||
|
// Null userId is already taken care of through the userChange observable above
|
||||||
|
filter((u) => u != null),
|
||||||
|
// Take the userId and build the fullKey that we can now create
|
||||||
|
map((userId) => [userId, userKeyBuilder(userId, this.keyDefinition)] as const),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
// Filter to only storage updates that pertain to our key
|
||||||
|
filter(([storageUpdate, [_userId, fullKey]]) => storageUpdate.key === fullKey),
|
||||||
|
switchMap(async ([storageUpdate, [userId, fullKey]]) => {
|
||||||
|
// We can shortcut on updateType of "remove"
|
||||||
|
// and just emit null.
|
||||||
|
if (storageUpdate.updateType === "remove") {
|
||||||
|
return [userId, null] as CombinedState<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
return [
|
||||||
|
userId,
|
||||||
|
await getStoredValue(
|
||||||
|
fullKey,
|
||||||
|
this.chosenStorageLocation,
|
||||||
|
this.keyDefinition.deserializer,
|
||||||
|
),
|
||||||
|
] as CombinedState<T>;
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
this.combinedState$ = merge(userChangeAndInitial$, latestStorage$).pipe(
|
||||||
|
share({
|
||||||
|
connector: () => new ReplaySubject<CombinedState<T> | typeof FAKE>(1),
|
||||||
|
resetOnRefCountZero: () => timer(this.keyDefinition.cleanupDelayMs),
|
||||||
|
}),
|
||||||
|
// Filter out FAKE AFTER the share so that we can fill the ReplaySubjects
|
||||||
|
// buffer with something and avoid emitting when there is no active user.
|
||||||
|
filter<CombinedState<T>>((d) => d !== (FAKE as unknown)),
|
||||||
|
);
|
||||||
|
|
||||||
|
// State should just be combined state without the user id
|
||||||
|
this.state$ = this.combinedState$.pipe(map(([_userId, state]) => state));
|
||||||
}
|
}
|
||||||
|
|
||||||
async update<TCombine>(
|
async update<TCombine>(
|
||||||
@ -83,18 +134,11 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this should be removed
|
|
||||||
async getFromState(): Promise<T> {
|
|
||||||
const key = await this.createKey();
|
|
||||||
return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async internalUpdate<TCombine>(
|
private async internalUpdate<TCombine>(
|
||||||
configureState: (state: T, dependency: TCombine) => T,
|
configureState: (state: T, dependency: TCombine) => T,
|
||||||
options: StateUpdateOptions<T, TCombine>,
|
options: StateUpdateOptions<T, TCombine>,
|
||||||
) {
|
) {
|
||||||
const key = await this.createKey();
|
const [key, currentState] = await this.getStateForUpdate();
|
||||||
const currentState = await this.getStateForUpdate(key);
|
|
||||||
const combinedDependencies =
|
const combinedDependencies =
|
||||||
options.combineLatestWith != null
|
options.combineLatestWith != null
|
||||||
? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout)))
|
? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout)))
|
||||||
@ -109,110 +153,22 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
|
|||||||
return newState;
|
return newState;
|
||||||
}
|
}
|
||||||
|
|
||||||
private initializeObservable() {
|
|
||||||
this.storageUpdateSubscription = this.chosenStorageLocation.updates$
|
|
||||||
.pipe(
|
|
||||||
combineLatestWith(this.formattedKey$),
|
|
||||||
filter(([update, key]) => key !== null && update.key === key),
|
|
||||||
switchMap(async ([update, key]) => {
|
|
||||||
if (update.updateType === "remove") {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return await this.getState(key);
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
.subscribe((v) => this.stateSubject.next(v));
|
|
||||||
|
|
||||||
this.activeAccountUpdateSubscription = this.formattedKey$
|
|
||||||
.pipe(
|
|
||||||
switchMap(async (key) => {
|
|
||||||
if (key == null) {
|
|
||||||
return FAKE_DEFAULT;
|
|
||||||
}
|
|
||||||
return await this.getState(key);
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
.subscribe((v) => this.stateSubject.next(v));
|
|
||||||
|
|
||||||
this.subscriberCount.subscribe((count) => {
|
|
||||||
if (count === 0 && this.stateObservable != null) {
|
|
||||||
this.triggerCleanup();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return new Observable<T>((subscriber) => {
|
|
||||||
this.incrementSubscribers();
|
|
||||||
|
|
||||||
// reinitialize listeners after cleanup
|
|
||||||
if (this.reinitialize) {
|
|
||||||
this.reinitialize = false;
|
|
||||||
this.initializeObservable();
|
|
||||||
}
|
|
||||||
|
|
||||||
const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
|
|
||||||
subscriber.unsubscribe = () => {
|
|
||||||
this.decrementSubscribers();
|
|
||||||
prevUnsubscribe();
|
|
||||||
};
|
|
||||||
|
|
||||||
return this.stateSubject
|
|
||||||
.pipe(
|
|
||||||
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
|
|
||||||
filter((i) => i !== FAKE_DEFAULT),
|
|
||||||
)
|
|
||||||
.subscribe(subscriber);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
protected async createKey(): Promise<string> {
|
|
||||||
const formattedKey = await firstValueFrom(this.formattedKey$);
|
|
||||||
if (formattedKey == null) {
|
|
||||||
throw new Error("Cannot create a key while there is no active user.");
|
|
||||||
}
|
|
||||||
return formattedKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** For use in update methods, does not wait for update to complete before yielding state.
|
/** For use in update methods, does not wait for update to complete before yielding state.
|
||||||
* The expectation is that that await is already done
|
* The expectation is that that await is already done
|
||||||
*/
|
*/
|
||||||
protected async getStateForUpdate(key: string) {
|
protected async getStateForUpdate() {
|
||||||
const currentValue = this.stateSubject.getValue();
|
const [userId, data] = await firstValueFrom(
|
||||||
return currentValue === FAKE_DEFAULT
|
this.combinedState$.pipe(
|
||||||
? await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer)
|
timeout({
|
||||||
: currentValue;
|
first: 1000,
|
||||||
}
|
with: () => throwError(() => new Error("No active user at this time.")),
|
||||||
|
}),
|
||||||
/** To be used in observables. Awaits updates to ensure they are complete */
|
),
|
||||||
private async getState(key: string): Promise<T> {
|
);
|
||||||
if (this.updatePromise != null) {
|
return [userKeyBuilder(userId, this.keyDefinition), data] as const;
|
||||||
await this.updatePromise;
|
|
||||||
}
|
|
||||||
return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected saveToStorage(key: string, data: T): Promise<void> {
|
protected saveToStorage(key: string, data: T): Promise<void> {
|
||||||
return this.chosenStorageLocation.save(key, data);
|
return this.chosenStorageLocation.save(key, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
private incrementSubscribers() {
|
|
||||||
this.subscriberCount.next(this.subscriberCount.value + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private decrementSubscribers() {
|
|
||||||
this.subscriberCount.next(this.subscriberCount.value - 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private triggerCleanup() {
|
|
||||||
setTimeout(() => {
|
|
||||||
if (this.subscriberCount.value === 0) {
|
|
||||||
this.updatePromise = null;
|
|
||||||
this.storageUpdateSubscription?.unsubscribe();
|
|
||||||
this.activeAccountUpdateSubscription?.unsubscribe();
|
|
||||||
this.subscriberCount.complete();
|
|
||||||
this.subscriberCount = new BehaviorSubject<number>(0);
|
|
||||||
this.stateSubject.next(FAKE_DEFAULT);
|
|
||||||
this.reinitialize = true;
|
|
||||||
}
|
|
||||||
}, this.keyDefinition.cleanupDelayMs);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
import { UserId } from "../../../types/guid";
|
import { UserId } from "../../../types/guid";
|
||||||
import { EncryptService } from "../../abstractions/encrypt.service";
|
|
||||||
import {
|
import {
|
||||||
AbstractMemoryStorageService,
|
AbstractMemoryStorageService,
|
||||||
AbstractStorageService,
|
AbstractStorageService,
|
||||||
@ -16,7 +15,6 @@ export class DefaultSingleUserStateProvider implements SingleUserStateProvider {
|
|||||||
private cache: Record<string, SingleUserState<unknown>> = {};
|
private cache: Record<string, SingleUserState<unknown>> = {};
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
protected encryptService: EncryptService,
|
|
||||||
protected memoryStorage: AbstractMemoryStorageService & ObservableStorageService,
|
protected memoryStorage: AbstractMemoryStorageService & ObservableStorageService,
|
||||||
protected diskStorage: AbstractStorageService & ObservableStorageService,
|
protected diskStorage: AbstractStorageService & ObservableStorageService,
|
||||||
) {}
|
) {}
|
||||||
@ -42,7 +40,6 @@ export class DefaultSingleUserStateProvider implements SingleUserStateProvider {
|
|||||||
return new DefaultSingleUserState<T>(
|
return new DefaultSingleUserState<T>(
|
||||||
userId,
|
userId,
|
||||||
keyDefinition,
|
keyDefinition,
|
||||||
this.encryptService,
|
|
||||||
this.getLocation(keyDefinition.stateDefinition.storageLocation),
|
this.getLocation(keyDefinition.stateDefinition.storageLocation),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
* @jest-environment ../shared/test.environment.ts
|
* @jest-environment ../shared/test.environment.ts
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { anySymbol } from "jest-mock-extended";
|
|
||||||
import { firstValueFrom, of } from "rxjs";
|
import { firstValueFrom, of } from "rxjs";
|
||||||
import { Jsonify } from "type-fest";
|
import { Jsonify } from "type-fest";
|
||||||
|
|
||||||
@ -46,12 +45,7 @@ describe("DefaultSingleUserState", () => {
|
|||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
diskStorageService = new FakeStorageService();
|
diskStorageService = new FakeStorageService();
|
||||||
userState = new DefaultSingleUserState(
|
userState = new DefaultSingleUserState(userId, testKeyDefinition, diskStorageService);
|
||||||
userId,
|
|
||||||
testKeyDefinition,
|
|
||||||
null, // Not testing anything with encrypt service
|
|
||||||
diskStorageService,
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
@ -74,7 +68,12 @@ describe("DefaultSingleUserState", () => {
|
|||||||
const emissions = trackEmissions(userState.state$);
|
const emissions = trackEmissions(userState.state$);
|
||||||
await diskStorageService.save("wrong_key", newData);
|
await diskStorageService.save("wrong_key", newData);
|
||||||
|
|
||||||
expect(emissions).toHaveLength(0);
|
// Give userState a chance to emit it's initial value
|
||||||
|
// as well as wrongly emit the different key.
|
||||||
|
await awaitAsync();
|
||||||
|
|
||||||
|
// Just the initial value
|
||||||
|
expect(emissions).toEqual([null]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should emit initial storage value on first subscribe", async () => {
|
it("should emit initial storage value on first subscribe", async () => {
|
||||||
@ -94,6 +93,50 @@ describe("DefaultSingleUserState", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("combinedState$", () => {
|
||||||
|
it("should emit when storage updates", async () => {
|
||||||
|
const emissions = trackEmissions(userState.combinedState$);
|
||||||
|
await diskStorageService.save(userKey, newData);
|
||||||
|
await awaitAsync();
|
||||||
|
|
||||||
|
expect(emissions).toEqual([
|
||||||
|
[userId, null], // Initial value
|
||||||
|
[userId, newData],
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should not emit when update key does not match", async () => {
|
||||||
|
const emissions = trackEmissions(userState.combinedState$);
|
||||||
|
await diskStorageService.save("wrong_key", newData);
|
||||||
|
|
||||||
|
// Give userState a chance to emit it's initial value
|
||||||
|
// as well as wrongly emit the different key.
|
||||||
|
await awaitAsync();
|
||||||
|
|
||||||
|
// Just the initial value
|
||||||
|
expect(emissions).toHaveLength(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should emit initial storage value on first subscribe", async () => {
|
||||||
|
const initialStorage: Record<string, TestState> = {};
|
||||||
|
initialStorage[userKey] = TestState.fromJSON({
|
||||||
|
date: "2022-09-21T13:14:17.648Z",
|
||||||
|
});
|
||||||
|
diskStorageService.internalUpdateStore(initialStorage);
|
||||||
|
|
||||||
|
const combinedState = await firstValueFrom(userState.combinedState$);
|
||||||
|
expect(diskStorageService.mock.get).toHaveBeenCalledTimes(1);
|
||||||
|
expect(diskStorageService.mock.get).toHaveBeenCalledWith(
|
||||||
|
`user_${userId}_fake_fake`,
|
||||||
|
undefined,
|
||||||
|
);
|
||||||
|
expect(combinedState).toBeTruthy();
|
||||||
|
const [stateUserId, state] = combinedState;
|
||||||
|
expect(stateUserId).toBe(userId);
|
||||||
|
expect(state).toBe(initialStorage[userKey]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe("update", () => {
|
describe("update", () => {
|
||||||
it("should save on update", async () => {
|
it("should save on update", async () => {
|
||||||
const result = await userState.update((state) => {
|
const result = await userState.update((state) => {
|
||||||
@ -309,7 +352,6 @@ describe("DefaultSingleUserState", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
|
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
|
||||||
expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
|
|
||||||
const val = await userState.update((state) => {
|
const val = await userState.update((state) => {
|
||||||
return newData;
|
return newData;
|
||||||
});
|
});
|
||||||
@ -322,14 +364,8 @@ describe("DefaultSingleUserState", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe("cleanup", () => {
|
describe("cleanup", () => {
|
||||||
async function assertClean() {
|
function assertClean() {
|
||||||
const emissions = trackEmissions(userState["stateSubject"]);
|
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(0);
|
||||||
const initial = structuredClone(emissions);
|
|
||||||
|
|
||||||
diskStorageService.save(userKey, newData);
|
|
||||||
await awaitAsync(); // storage updates are behind a promise
|
|
||||||
|
|
||||||
expect(emissions).toEqual(initial); // no longer listening to storage updates
|
|
||||||
}
|
}
|
||||||
|
|
||||||
it("should cleanup after last subscriber", async () => {
|
it("should cleanup after last subscriber", async () => {
|
||||||
@ -337,11 +373,9 @@ describe("DefaultSingleUserState", () => {
|
|||||||
await awaitAsync(); // storage updates are behind a promise
|
await awaitAsync(); // storage updates are behind a promise
|
||||||
|
|
||||||
subscription.unsubscribe();
|
subscription.unsubscribe();
|
||||||
expect(userState["subscriberCount"].getValue()).toBe(0);
|
|
||||||
// Wait for cleanup
|
// Wait for cleanup
|
||||||
await awaitAsync(cleanupDelayMs * 2);
|
await awaitAsync(cleanupDelayMs * 2);
|
||||||
|
assertClean();
|
||||||
await assertClean();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should not cleanup if there are still subscribers", async () => {
|
it("should not cleanup if there are still subscribers", async () => {
|
||||||
@ -355,7 +389,7 @@ describe("DefaultSingleUserState", () => {
|
|||||||
// Wait for cleanup
|
// Wait for cleanup
|
||||||
await awaitAsync(cleanupDelayMs * 2);
|
await awaitAsync(cleanupDelayMs * 2);
|
||||||
|
|
||||||
expect(userState["subscriberCount"].getValue()).toBe(1);
|
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1);
|
||||||
|
|
||||||
// Still be listening to storage updates
|
// Still be listening to storage updates
|
||||||
diskStorageService.save(userKey, newData);
|
diskStorageService.save(userKey, newData);
|
||||||
@ -366,7 +400,7 @@ describe("DefaultSingleUserState", () => {
|
|||||||
// Wait for cleanup
|
// Wait for cleanup
|
||||||
await awaitAsync(cleanupDelayMs * 2);
|
await awaitAsync(cleanupDelayMs * 2);
|
||||||
|
|
||||||
await assertClean();
|
assertClean();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("can re-initialize after cleanup", async () => {
|
it("can re-initialize after cleanup", async () => {
|
||||||
@ -394,12 +428,15 @@ describe("DefaultSingleUserState", () => {
|
|||||||
await awaitAsync();
|
await awaitAsync();
|
||||||
|
|
||||||
subscription.unsubscribe();
|
subscription.unsubscribe();
|
||||||
expect(userState["subscriberCount"].getValue()).toBe(0);
|
|
||||||
// Do not wait long enough for cleanup
|
// Do not wait long enough for cleanup
|
||||||
await awaitAsync(cleanupDelayMs / 2);
|
await awaitAsync(cleanupDelayMs / 2);
|
||||||
|
|
||||||
expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
|
const value = await firstValueFrom(userState.state$);
|
||||||
expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
|
expect(value).toEqual(newData);
|
||||||
|
|
||||||
|
// Should be called once for the initial subscription and a second time during the save
|
||||||
|
// but should not be called for a second subscription if the cleanup hasn't happened yet.
|
||||||
|
expect(diskStorageService.mock.get).toHaveBeenCalledTimes(2);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("state$ observables are durable to cleanup", async () => {
|
it("state$ observables are durable to cleanup", async () => {
|
||||||
|
@ -1,51 +1,69 @@
|
|||||||
import {
|
import {
|
||||||
BehaviorSubject,
|
|
||||||
Observable,
|
Observable,
|
||||||
Subscription,
|
ReplaySubject,
|
||||||
|
combineLatest,
|
||||||
|
defer,
|
||||||
filter,
|
filter,
|
||||||
firstValueFrom,
|
firstValueFrom,
|
||||||
|
merge,
|
||||||
|
of,
|
||||||
|
share,
|
||||||
switchMap,
|
switchMap,
|
||||||
timeout,
|
timeout,
|
||||||
|
timer,
|
||||||
} from "rxjs";
|
} from "rxjs";
|
||||||
|
|
||||||
import { UserId } from "../../../types/guid";
|
import { UserId } from "../../../types/guid";
|
||||||
import { EncryptService } from "../../abstractions/encrypt.service";
|
|
||||||
import {
|
import {
|
||||||
AbstractStorageService,
|
AbstractStorageService,
|
||||||
ObservableStorageService,
|
ObservableStorageService,
|
||||||
} from "../../abstractions/storage.service";
|
} from "../../abstractions/storage.service";
|
||||||
import { KeyDefinition, userKeyBuilder } from "../key-definition";
|
import { KeyDefinition, userKeyBuilder } from "../key-definition";
|
||||||
import { StateUpdateOptions, populateOptionsWithDefault } from "../state-update-options";
|
import { StateUpdateOptions, populateOptionsWithDefault } from "../state-update-options";
|
||||||
import { SingleUserState } from "../user-state";
|
import { CombinedState, SingleUserState } from "../user-state";
|
||||||
|
|
||||||
import { getStoredValue } from "./util";
|
import { getStoredValue } from "./util";
|
||||||
|
|
||||||
const FAKE_DEFAULT = Symbol("fakeDefault");
|
|
||||||
|
|
||||||
export class DefaultSingleUserState<T> implements SingleUserState<T> {
|
export class DefaultSingleUserState<T> implements SingleUserState<T> {
|
||||||
private storageKey: string;
|
private storageKey: string;
|
||||||
private updatePromise: Promise<T> | null = null;
|
private updatePromise: Promise<T> | null = null;
|
||||||
private storageUpdateSubscription: Subscription;
|
|
||||||
private subscriberCount = new BehaviorSubject<number>(0);
|
|
||||||
private stateObservable: Observable<T>;
|
|
||||||
private reinitialize = false;
|
|
||||||
|
|
||||||
protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
|
state$: Observable<T>;
|
||||||
T | typeof FAKE_DEFAULT
|
combinedState$: Observable<CombinedState<T>>;
|
||||||
>(FAKE_DEFAULT);
|
|
||||||
|
|
||||||
get state$() {
|
|
||||||
this.stateObservable = this.stateObservable ?? this.initializeObservable();
|
|
||||||
return this.stateObservable;
|
|
||||||
}
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
readonly userId: UserId,
|
readonly userId: UserId,
|
||||||
private keyDefinition: KeyDefinition<T>,
|
private keyDefinition: KeyDefinition<T>,
|
||||||
private encryptService: EncryptService,
|
|
||||||
private chosenLocation: AbstractStorageService & ObservableStorageService,
|
private chosenLocation: AbstractStorageService & ObservableStorageService,
|
||||||
) {
|
) {
|
||||||
this.storageKey = userKeyBuilder(this.userId, this.keyDefinition);
|
this.storageKey = userKeyBuilder(this.userId, this.keyDefinition);
|
||||||
|
const initialStorageGet$ = defer(() => {
|
||||||
|
return getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer);
|
||||||
|
});
|
||||||
|
|
||||||
|
const latestStorage$ = chosenLocation.updates$.pipe(
|
||||||
|
filter((s) => s.key === this.storageKey),
|
||||||
|
switchMap(async (storageUpdate) => {
|
||||||
|
if (storageUpdate.updateType === "remove") {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return await getStoredValue(
|
||||||
|
this.storageKey,
|
||||||
|
this.chosenLocation,
|
||||||
|
this.keyDefinition.deserializer,
|
||||||
|
);
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
this.state$ = merge(initialStorageGet$, latestStorage$).pipe(
|
||||||
|
share({
|
||||||
|
connector: () => new ReplaySubject<T>(1),
|
||||||
|
resetOnRefCountZero: () => timer(this.keyDefinition.cleanupDelayMs),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
this.combinedState$ = combineLatest([of(userId), this.state$]);
|
||||||
}
|
}
|
||||||
|
|
||||||
async update<TCombine>(
|
async update<TCombine>(
|
||||||
@ -85,94 +103,10 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
|
|||||||
return newState;
|
return newState;
|
||||||
}
|
}
|
||||||
|
|
||||||
private initializeObservable() {
|
|
||||||
this.storageUpdateSubscription = this.chosenLocation.updates$
|
|
||||||
.pipe(
|
|
||||||
filter((update) => update.key === this.storageKey),
|
|
||||||
switchMap(async (update) => {
|
|
||||||
if (update.updateType === "remove") {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return await this.getFromState();
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
.subscribe((v) => this.stateSubject.next(v));
|
|
||||||
|
|
||||||
this.subscriberCount.subscribe((count) => {
|
|
||||||
if (count === 0 && this.stateObservable != null) {
|
|
||||||
this.triggerCleanup();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to
|
|
||||||
// trigger populating it immediately.
|
|
||||||
this.getFromState().then((s) => {
|
|
||||||
this.stateSubject.next(s);
|
|
||||||
});
|
|
||||||
|
|
||||||
return new Observable<T>((subscriber) => {
|
|
||||||
this.incrementSubscribers();
|
|
||||||
|
|
||||||
// reinitialize listeners after cleanup
|
|
||||||
if (this.reinitialize) {
|
|
||||||
this.reinitialize = false;
|
|
||||||
this.initializeObservable();
|
|
||||||
}
|
|
||||||
|
|
||||||
const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
|
|
||||||
subscriber.unsubscribe = () => {
|
|
||||||
this.decrementSubscribers();
|
|
||||||
prevUnsubscribe();
|
|
||||||
};
|
|
||||||
|
|
||||||
return this.stateSubject
|
|
||||||
.pipe(
|
|
||||||
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
|
|
||||||
filter<T>((i) => i != FAKE_DEFAULT),
|
|
||||||
)
|
|
||||||
.subscribe(subscriber);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/** For use in update methods, does not wait for update to complete before yielding state.
|
/** For use in update methods, does not wait for update to complete before yielding state.
|
||||||
* The expectation is that that await is already done
|
* The expectation is that that await is already done
|
||||||
*/
|
*/
|
||||||
private async getStateForUpdate() {
|
private async getStateForUpdate() {
|
||||||
const currentValue = this.stateSubject.getValue();
|
return await firstValueFrom(this.state$);
|
||||||
return currentValue === FAKE_DEFAULT
|
|
||||||
? await getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer)
|
|
||||||
: currentValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
async getFromState(): Promise<T> {
|
|
||||||
if (this.updatePromise != null) {
|
|
||||||
return await this.updatePromise;
|
|
||||||
}
|
|
||||||
return await getStoredValue(
|
|
||||||
this.storageKey,
|
|
||||||
this.chosenLocation,
|
|
||||||
this.keyDefinition.deserializer,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private incrementSubscribers() {
|
|
||||||
this.subscriberCount.next(this.subscriberCount.value + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private decrementSubscribers() {
|
|
||||||
this.subscriberCount.next(this.subscriberCount.value - 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private triggerCleanup() {
|
|
||||||
setTimeout(() => {
|
|
||||||
if (this.subscriberCount.value === 0) {
|
|
||||||
this.updatePromise = null;
|
|
||||||
this.storageUpdateSubscription.unsubscribe();
|
|
||||||
this.subscriberCount.complete();
|
|
||||||
this.subscriberCount = new BehaviorSubject<number>(0);
|
|
||||||
this.stateSubject.next(FAKE_DEFAULT);
|
|
||||||
this.reinitialize = true;
|
|
||||||
}
|
|
||||||
}, this.keyDefinition.cleanupDelayMs);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,12 +4,22 @@ import { UserId } from "../../types/guid";
|
|||||||
|
|
||||||
import { StateUpdateOptions } from "./state-update-options";
|
import { StateUpdateOptions } from "./state-update-options";
|
||||||
|
|
||||||
|
export type CombinedState<T> = readonly [userId: UserId, state: T];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A helper object for interacting with state that is scoped to a specific user.
|
* A helper object for interacting with state that is scoped to a specific user.
|
||||||
*/
|
*/
|
||||||
export interface UserState<T> {
|
export interface UserState<T> {
|
||||||
|
/**
|
||||||
|
* Emits a stream of data.
|
||||||
|
*/
|
||||||
readonly state$: Observable<T>;
|
readonly state$: Observable<T>;
|
||||||
readonly getFromState: () => Promise<T>;
|
|
||||||
|
/**
|
||||||
|
* Emits a stream of data alongside the user id the data corresponds to.
|
||||||
|
*/
|
||||||
|
readonly combinedState$: Observable<CombinedState<T>>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates backing stores for the active user.
|
* Updates backing stores for the active user.
|
||||||
* @param configureState function that takes the current state and returns the new state
|
* @param configureState function that takes the current state and returns the new state
|
||||||
|
Loading…
Reference in New Issue
Block a user