1
0
mirror of https://github.com/bitwarden/browser.git synced 2024-11-28 12:45:45 +01:00

Revert "Ps/avoid state emit until updated (#7124)" (#7187)

This reverts commit 38c335d8fb.
This commit is contained in:
Matt Gibson 2023-12-12 08:07:42 -05:00 committed by GitHub
parent 79dbe051c8
commit 08b69fffe1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 205 additions and 1126 deletions

View File

@ -59,7 +59,7 @@ export class FakeStorageService implements AbstractStorageService {
return Promise.resolve(this.store[key] != null); return Promise.resolve(this.store[key] != null);
} }
save<T>(key: string, obj: T, options?: StorageOptions): Promise<void> { save<T>(key: string, obj: T, options?: StorageOptions): Promise<void> {
this.mock.save(key, obj, options); this.mock.save(key, options);
this.store[key] = obj; this.store[key] = obj;
this.updatesSubject.next({ key: key, updateType: "save" }); this.updatesSubject.next({ key: key, updateType: "save" });
return Promise.resolve(); return Promise.resolve();

View File

@ -69,10 +69,6 @@ export function trackEmissions<T>(observable: Observable<T>): T[] {
case "boolean": case "boolean":
emissions.push(value); emissions.push(value);
break; break;
case "symbol":
// Cheating types to make symbols work at all
emissions.push(value.toString() as T);
break;
default: { default: {
emissions.push(clone(value)); emissions.push(clone(value));
} }
@ -89,7 +85,7 @@ function clone(value: any): any {
} }
} }
export async function awaitAsync(ms = 1) { export async function awaitAsync(ms = 0) {
if (ms < 1) { if (ms < 1) {
await Promise.resolve(); await Promise.resolve();
} else { } else {

View File

@ -2,7 +2,7 @@
* need to update test environment so trackEmissions works appropriately * need to update test environment so trackEmissions works appropriately
* @jest-environment ../shared/test.environment.ts * @jest-environment ../shared/test.environment.ts
*/ */
import { any, anySymbol, mock } from "jest-mock-extended"; import { any, mock } from "jest-mock-extended";
import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs"; import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs";
import { Jsonify } from "type-fest"; import { Jsonify } from "type-fest";
@ -11,7 +11,7 @@ import { FakeStorageService } from "../../../../spec/fake-storage.service";
import { AccountInfo, AccountService } from "../../../auth/abstractions/account.service"; import { AccountInfo, AccountService } from "../../../auth/abstractions/account.service";
import { AuthenticationStatus } from "../../../auth/enums/authentication-status"; import { AuthenticationStatus } from "../../../auth/enums/authentication-status";
import { UserId } from "../../../types/guid"; import { UserId } from "../../../types/guid";
import { KeyDefinition, userKeyBuilder } from "../key-definition"; import { KeyDefinition } from "../key-definition";
import { StateDefinition } from "../state-definition"; import { StateDefinition } from "../state-definition";
import { DefaultActiveUserState } from "./default-active-user-state"; import { DefaultActiveUserState } from "./default-active-user-state";
@ -32,10 +32,9 @@ class TestState {
} }
const testStateDefinition = new StateDefinition("fake", "disk"); const testStateDefinition = new StateDefinition("fake", "disk");
const cleanupDelayMs = 10;
const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", { const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", {
deserializer: TestState.fromJSON, deserializer: TestState.fromJSON,
cleanupDelayMs,
}); });
describe("DefaultActiveUserState", () => { describe("DefaultActiveUserState", () => {
@ -57,14 +56,10 @@ describe("DefaultActiveUserState", () => {
); );
}); });
const makeUserId = (id: string) => {
return id != null ? (`00000000-0000-1000-a000-00000000000${id}` as UserId) : undefined;
};
const changeActiveUser = async (id: string) => { const changeActiveUser = async (id: string) => {
const userId = makeUserId(id); const userId = id != null ? `00000000-0000-1000-a000-00000000000${id}` : undefined;
activeAccountSubject.next({ activeAccountSubject.next({
id: userId, id: userId as UserId,
email: `test${id}@example.com`, email: `test${id}@example.com`,
name: `Test User ${id}`, name: `Test User ${id}`,
status: AuthenticationStatus.Unlocked, status: AuthenticationStatus.Unlocked,
@ -95,7 +90,7 @@ describe("DefaultActiveUserState", () => {
const emissions = trackEmissions(userState.state$); const emissions = trackEmissions(userState.state$);
// User signs in // User signs in
await changeActiveUser("1"); changeActiveUser("1");
await awaitAsync(); await awaitAsync();
// Service does an update // Service does an update
@ -116,17 +111,17 @@ describe("DefaultActiveUserState", () => {
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
1, 1,
"user_00000000-0000-1000-a000-000000000001_fake_fake", "user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), // options any(),
); );
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
2, 2,
"user_00000000-0000-1000-a000-000000000001_fake_fake", "user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), // options any(),
); );
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
3, 3,
"user_00000000-0000-1000-a000-000000000002_fake_fake", "user_00000000-0000-1000-a000-000000000002_fake_fake",
any(), // options any(),
); );
// Should only have saved data for the first user // Should only have saved data for the first user
@ -134,8 +129,7 @@ describe("DefaultActiveUserState", () => {
expect(diskStorageService.mock.save).toHaveBeenNthCalledWith( expect(diskStorageService.mock.save).toHaveBeenNthCalledWith(
1, 1,
"user_00000000-0000-1000-a000-000000000001_fake_fake", "user_00000000-0000-1000-a000-000000000001_fake_fake",
updatedState, any(),
any(), // options
); );
}); });
@ -189,17 +183,15 @@ describe("DefaultActiveUserState", () => {
}); });
it("should not emit a previous users value if that user is no longer active", async () => { it("should not emit a previous users value if that user is no longer active", async () => {
const user1Data: Jsonify<TestState> = { diskStorageService.internalUpdateStore({
"user_00000000-0000-1000-a000-000000000001_fake_fake": {
date: "2020-09-21T13:14:17.648Z", date: "2020-09-21T13:14:17.648Z",
array: ["value"], array: ["value"],
}; } as Jsonify<TestState>,
const user2Data: Jsonify<TestState> = { "user_00000000-0000-1000-a000-000000000002_fake_fake": {
date: "2020-09-21T13:14:17.648Z", date: "2020-09-21T13:14:17.648Z",
array: [], array: [],
}; } as Jsonify<TestState>,
diskStorageService.internalUpdateStore({
"user_00000000-0000-1000-a000-000000000001_fake_fake": user1Data,
"user_00000000-0000-1000-a000-000000000002_fake_fake": user2Data,
}); });
// This starts one subscription on the observable for tracking emissions throughout // This starts one subscription on the observable for tracking emissions throughout
@ -211,7 +203,7 @@ describe("DefaultActiveUserState", () => {
// This should always return a value right await // This should always return a value right await
const value = await firstValueFrom(userState.state$); const value = await firstValueFrom(userState.state$);
expect(value).toEqual(user1Data); expect(value).toBeTruthy();
// Make it such that there is no active user // Make it such that there is no active user
await changeActiveUser(undefined); await changeActiveUser(undefined);
@ -230,34 +222,20 @@ describe("DefaultActiveUserState", () => {
rejectedError = err; rejectedError = err;
}); });
expect(resolvedValue).toBeUndefined(); expect(resolvedValue).toBeFalsy();
expect(rejectedError).not.toBeUndefined(); expect(rejectedError).toBeTruthy();
expect(rejectedError.message).toBe("Timeout has occurred"); expect(rejectedError.message).toBe("Timeout has occurred");
// We need to figure out if something should be emitted // We need to figure out if something should be emitted
// when there becomes no active user, if we don't want that to emit // when there becomes no active user, if we don't want that to emit
// this value is correct. // this value is correct.
expect(emissions).toEqual([user1Data]); expect(emissions).toHaveLength(2);
});
it("should not emit twice if there are two listeners", async () => {
await changeActiveUser("1");
const emissions = trackEmissions(userState.state$);
const emissions2 = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([
null, // Initial value
]);
expect(emissions2).toEqual([
null, // Initial value
]);
}); });
describe("update", () => { describe("update", () => {
const newData = { date: new Date(), array: ["test"] }; const newData = { date: new Date(), array: ["test"] };
beforeEach(async () => { beforeEach(async () => {
await changeActiveUser("1"); changeActiveUser("1");
}); });
it("should save on update", async () => { it("should save on update", async () => {
@ -337,8 +315,6 @@ describe("DefaultActiveUserState", () => {
return initialData; return initialData;
}); });
await awaitAsync();
await userState.update((state, dependencies) => { await userState.update((state, dependencies) => {
expect(state).toEqual(initialData); expect(state).toEqual(initialData);
return newData; return newData;
@ -353,285 +329,4 @@ describe("DefaultActiveUserState", () => {
]); ]);
}); });
}); });
describe("update races", () => {
const newData = { date: new Date(), array: ["test"] };
const userId = makeUserId("1");
beforeEach(async () => {
await changeActiveUser("1");
await awaitAsync();
});
test("subscriptions during an update should receive the current and latest", async () => {
const oldData = { date: new Date(2019, 1, 1), array: ["oldValue1"] };
await userState.update(() => {
return oldData;
});
const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] };
await userState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => {
emissions2 = trackEmissions(userState.state$);
await originalSave(key, obj);
});
const val = await userState.update(() => {
return newData;
});
await awaitAsync(10);
expect(val).toEqual(newData);
expect(emissions).toEqual([initialData, newData]);
expect(emissions2).toEqual([initialData, newData]);
});
test("subscription during an aborted update should receive the last value", async () => {
// Seed with interesting data
const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] };
await userState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const val = await userState.update(
(state) => {
return newData;
},
{
shouldUpdate: () => {
emissions2 = trackEmissions(userState.state$);
return false;
},
},
);
await awaitAsync();
expect(val).toEqual(initialData);
expect(emissions).toEqual([initialData]);
expect(emissions2).toEqual([initialData]);
});
test("updates should wait until previous update is complete", async () => {
trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async (key: string, obj: any) => {
let resolved = false;
await Promise.race([
userState.update(() => {
// deadlocks
resolved = true;
return newData;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(false);
})
.mockImplementation((...args) => {
return originalSave(...args);
});
await userState.update(() => {
return newData;
});
});
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
const val = await userState.update((state) => {
return newData;
});
expect(val).toEqual(newData);
const call = diskStorageService.mock.save.mock.calls[0];
expect(call[0]).toEqual(`user_${userId}_fake_fake`);
expect(call[1]).toEqual(newData);
});
it("does not await updates if the active user changes", async () => {
const initialUserId = (await firstValueFrom(accountService.activeAccount$)).id;
expect(initialUserId).toBe(userId);
trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async (key: string, obj: any) => {
let resolved = false;
await changeActiveUser("2");
await Promise.race([
userState.update(() => {
// should not deadlock because we updated the user
resolved = true;
return newData;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(true);
})
.mockImplementation((...args) => {
return originalSave(...args);
});
await userState.update(() => {
return newData;
});
});
it("stores updates for users in the correct place when active user changes mid-update", async () => {
trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise
const user2Data = { date: new Date(), array: ["user 2 data"] };
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async (key: string, obj: any) => {
let resolved = false;
await changeActiveUser("2");
await Promise.race([
userState.update(() => {
// should not deadlock because we updated the user
resolved = true;
return user2Data;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(true);
await originalSave(key, obj);
})
.mockImplementation((...args) => {
return originalSave(...args);
});
await userState.update(() => {
return newData;
});
await awaitAsync();
expect(diskStorageService.mock.save).toHaveBeenCalledTimes(2);
const innerCall = diskStorageService.mock.save.mock.calls[0];
expect(innerCall[0]).toEqual(`user_${makeUserId("2")}_fake_fake`);
expect(innerCall[1]).toEqual(user2Data);
const outerCall = diskStorageService.mock.save.mock.calls[1];
expect(outerCall[0]).toEqual(`user_${makeUserId("1")}_fake_fake`);
expect(outerCall[1]).toEqual(newData);
});
});
describe("cleanup", () => {
const newData = { date: new Date(), array: ["test"] };
const userId = makeUserId("1");
let userKey: string;
beforeEach(async () => {
await changeActiveUser("1");
userKey = userKeyBuilder(userId, testKeyDefinition);
});
async function assertClean() {
const emissions = trackEmissions(userState["stateSubject"]);
const initial = structuredClone(emissions);
diskStorageService.save(userKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(emissions).toEqual(initial); // no longer listening to storage updates
}
it("should cleanup after last subscriber", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync(); // storage updates are behind a promise
subscription.unsubscribe();
expect(userState["subscriberCount"].getValue()).toBe(0);
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("should not cleanup if there are still subscribers", async () => {
const subscription1 = userState.state$.subscribe();
const sub2Emissions: TestState[] = [];
const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v));
await awaitAsync(); // storage updates are behind a promise
subscription1.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
expect(userState["subscriberCount"].getValue()).toBe(1);
// Still be listening to storage updates
diskStorageService.save(userKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(sub2Emissions).toEqual([null, newData]);
subscription2.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("can re-initialize after cleanup", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync();
subscription.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
const emissions = trackEmissions(userState.state$);
await awaitAsync();
diskStorageService.save(userKey, newData);
await awaitAsync();
expect(emissions).toEqual([null, newData]);
});
it("should not cleanup if a subscriber joins during the cleanup delay", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync();
await diskStorageService.save(userKey, newData);
await awaitAsync();
subscription.unsubscribe();
expect(userState["subscriberCount"].getValue()).toBe(0);
// Do not wait long enough for cleanup
await awaitAsync(cleanupDelayMs / 2);
expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
});
});
}); });

View File

@ -4,12 +4,12 @@ import {
map, map,
shareReplay, shareReplay,
switchMap, switchMap,
tap,
defer,
firstValueFrom, firstValueFrom,
combineLatestWith, combineLatestWith,
filter, filter,
timeout, timeout,
Subscription,
tap,
} from "rxjs"; } from "rxjs";
import { AccountService } from "../../../auth/abstractions/account.service"; import { AccountService } from "../../../auth/abstractions/account.service";
@ -31,21 +31,13 @@ const FAKE_DEFAULT = Symbol("fakeDefault");
export class DefaultActiveUserState<T> implements ActiveUserState<T> { export class DefaultActiveUserState<T> implements ActiveUserState<T> {
[activeMarker]: true; [activeMarker]: true;
private formattedKey$: Observable<string>; private formattedKey$: Observable<string>;
private updatePromise: Promise<T> | null = null;
private storageUpdateSubscription: Subscription;
private activeAccountUpdateSubscription: Subscription;
private subscriberCount = new BehaviorSubject<number>(0);
private stateObservable: Observable<T>;
protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject< protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
T | typeof FAKE_DEFAULT T | typeof FAKE_DEFAULT
>(FAKE_DEFAULT); >(FAKE_DEFAULT);
private stateSubject$ = this.stateSubject.asObservable(); private stateSubject$ = this.stateSubject.asObservable();
get state$() { state$: Observable<T>;
this.stateObservable = this.stateObservable ?? this.initializeObservable();
return this.stateObservable;
}
constructor( constructor(
protected keyDefinition: KeyDefinition<T>, protected keyDefinition: KeyDefinition<T>,
@ -59,12 +51,62 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
? userKeyBuilder(account.id, this.keyDefinition) ? userKeyBuilder(account.id, this.keyDefinition)
: null, : null,
), ),
tap(() => {
// We have a new key, so we should forget about previous update promises
this.updatePromise = null;
}),
shareReplay({ bufferSize: 1, refCount: false }), shareReplay({ bufferSize: 1, refCount: false }),
); );
const activeAccountData$ = this.formattedKey$.pipe(
switchMap(async (key) => {
if (key == null) {
return FAKE_DEFAULT;
}
return await getStoredValue(
key,
this.chosenStorageLocation,
this.keyDefinition.deserializer,
);
}),
// Share the execution
shareReplay({ refCount: false, bufferSize: 1 }),
);
const storageUpdates$ = this.chosenStorageLocation.updates$.pipe(
combineLatestWith(this.formattedKey$),
filter(([update, key]) => key !== null && update.key === key),
switchMap(async ([update, key]) => {
if (update.updateType === "remove") {
return null;
}
const data = await getStoredValue(
key,
this.chosenStorageLocation,
this.keyDefinition.deserializer,
);
return data;
}),
);
// Whomever subscribes to this data, should be notified of updated data
// if someone calls my update() method, or the active user changes.
this.state$ = defer(() => {
const accountChangeSubscription = activeAccountData$.subscribe((data) => {
this.stateSubject.next(data);
});
const storageUpdateSubscription = storageUpdates$.subscribe((data) => {
this.stateSubject.next(data);
});
return this.stateSubject$.pipe(
tap({
complete: () => {
accountChangeSubscription.unsubscribe();
storageUpdateSubscription.unsubscribe();
},
}),
);
})
// I fake the generic here because I am filtering out the other union type
// and this makes it so that typescript understands the true type
.pipe(filter<T>((value) => value != FAKE_DEFAULT));
} }
async update<TCombine>( async update<TCombine>(
@ -72,34 +114,8 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
options: StateUpdateOptions<T, TCombine> = {}, options: StateUpdateOptions<T, TCombine> = {},
): Promise<T> { ): Promise<T> {
options = populateOptionsWithDefault(options); options = populateOptionsWithDefault(options);
try {
if (this.updatePromise != null) {
await this.updatePromise;
}
this.updatePromise = this.internalUpdate(configureState, options);
const newState = await this.updatePromise;
return newState;
} finally {
this.updatePromise = null;
}
}
// TODO: this should be removed
async getFromState(): Promise<T> {
const key = await this.createKey(); const key = await this.createKey();
return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); const currentState = await this.getGuaranteedState(key);
}
createDerived<TTo>(converter: Converter<T, TTo>): DerivedUserState<TTo> {
return new DefaultDerivedUserState<T, TTo>(converter, this.encryptService, this);
}
private async internalUpdate<TCombine>(
configureState: (state: T, dependency: TCombine) => T,
options: StateUpdateOptions<T, TCombine>,
) {
const key = await this.createKey();
const currentState = await this.getStateForUpdate(key);
const combinedDependencies = const combinedDependencies =
options.combineLatestWith != null options.combineLatestWith != null
? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout)))
@ -114,53 +130,13 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
return newState; return newState;
} }
private initializeObservable() { async getFromState(): Promise<T> {
this.storageUpdateSubscription = this.chosenStorageLocation.updates$ const key = await this.createKey();
.pipe( return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer);
combineLatestWith(this.formattedKey$),
filter(([update, key]) => key !== null && update.key === key),
switchMap(async ([update, key]) => {
if (update.updateType === "remove") {
return null;
} }
return await this.getState(key);
}),
)
.subscribe((v) => this.stateSubject.next(v));
this.activeAccountUpdateSubscription = this.formattedKey$ createDerived<TTo>(converter: Converter<T, TTo>): DerivedUserState<TTo> {
.pipe( return new DefaultDerivedUserState<T, TTo>(converter, this.encryptService, this);
switchMap(async (key) => {
if (key == null) {
return FAKE_DEFAULT;
}
return await this.getState(key);
}),
)
.subscribe((v) => this.stateSubject.next(v));
this.subscriberCount.subscribe((count) => {
if (count === 0 && this.stateObservable != null) {
this.triggerCleanup();
}
});
return new Observable<T>((subscriber) => {
this.incrementSubscribers();
const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
subscriber.unsubscribe = () => {
this.decrementSubscribers();
prevUnsubscribe();
};
return this.stateSubject
.pipe(
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
filter((i) => i !== FAKE_DEFAULT),
)
.subscribe(subscriber);
});
} }
protected async createKey(): Promise<string> { protected async createKey(): Promise<string> {
@ -171,47 +147,22 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
return formattedKey; return formattedKey;
} }
/** For use in update methods, does not wait for update to complete before yielding state. protected async getGuaranteedState(key: string) {
* The expectation is that that await is already done
*/
protected async getStateForUpdate(key: string) {
const currentValue = this.stateSubject.getValue(); const currentValue = this.stateSubject.getValue();
return currentValue === FAKE_DEFAULT return currentValue === FAKE_DEFAULT ? await this.seedInitial(key) : currentValue;
? await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer)
: currentValue;
} }
/** To be used in observables. Awaits updates to ensure they are complete */ private async seedInitial(key: string): Promise<T> {
private async getState(key: string): Promise<T> { const value = await getStoredValue(
if (this.updatePromise != null) { key,
await this.updatePromise; this.chosenStorageLocation,
} this.keyDefinition.deserializer,
return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); );
this.stateSubject.next(value);
return value;
} }
protected saveToStorage(key: string, data: T): Promise<void> { protected saveToStorage(key: string, data: T): Promise<void> {
return this.chosenStorageLocation.save(key, data); return this.chosenStorageLocation.save(key, data);
} }
private incrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value + 1);
}
private decrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value - 1);
}
private triggerCleanup() {
setTimeout(() => {
if (this.subscriberCount.value === 0) {
this.updatePromise = null;
this.storageUpdateSubscription?.unsubscribe();
this.activeAccountUpdateSubscription?.unsubscribe();
this.stateObservable = null;
this.subscriberCount.complete();
this.subscriberCount = new BehaviorSubject<number>(0);
this.stateSubject.next(FAKE_DEFAULT);
}
}, this.keyDefinition.cleanupDelayMs);
}
} }

View File

@ -3,7 +3,6 @@
* @jest-environment ../shared/test.environment.ts * @jest-environment ../shared/test.environment.ts
*/ */
import { anySymbol } from "jest-mock-extended";
import { firstValueFrom, of } from "rxjs"; import { firstValueFrom, of } from "rxjs";
import { Jsonify } from "type-fest"; import { Jsonify } from "type-fest";
@ -29,10 +28,9 @@ class TestState {
} }
const testStateDefinition = new StateDefinition("fake", "disk"); const testStateDefinition = new StateDefinition("fake", "disk");
const cleanupDelayMs = 10;
const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", { const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", {
deserializer: TestState.fromJSON, deserializer: TestState.fromJSON,
cleanupDelayMs,
}); });
const globalKey = globalKeyBuilder(testKeyDefinition); const globalKey = globalKeyBuilder(testKeyDefinition);
@ -81,19 +79,6 @@ describe("DefaultGlobalState", () => {
expect(diskStorageService.mock.get).toHaveBeenCalledWith("global_fake_fake", undefined); expect(diskStorageService.mock.get).toHaveBeenCalledWith("global_fake_fake", undefined);
expect(state).toBeTruthy(); expect(state).toBeTruthy();
}); });
it("should not emit twice if there are two listeners", async () => {
const emissions = trackEmissions(globalState.state$);
const emissions2 = trackEmissions(globalState.state$);
await awaitAsync();
expect(emissions).toEqual([
null, // Initial value
]);
expect(emissions2).toEqual([
null, // Initial value
]);
});
}); });
describe("update", () => { describe("update", () => {
@ -148,7 +133,6 @@ describe("DefaultGlobalState", () => {
it("should not update if shouldUpdate returns false", async () => { it("should not update if shouldUpdate returns false", async () => {
const emissions = trackEmissions(globalState.state$); const emissions = trackEmissions(globalState.state$);
await awaitAsync(); // storage updates are behind a promise
const result = await globalState.update( const result = await globalState.update(
(state) => { (state) => {
@ -214,194 +198,4 @@ describe("DefaultGlobalState", () => {
expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); expect(emissions).toEqual(expect.arrayContaining([initialState, newState]));
}); });
}); });
describe("update races", () => {
test("subscriptions during an update should receive the current and latest data", async () => {
const oldData = { date: new Date(2019, 1, 1) };
await globalState.update(() => {
return oldData;
});
const initialData = { date: new Date(2020, 1, 1) };
await globalState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(globalState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => {
emissions2 = trackEmissions(globalState.state$);
await originalSave(key, obj);
});
const val = await globalState.update(() => {
return newData;
});
await awaitAsync(10);
expect(val).toEqual(newData);
expect(emissions).toEqual([initialData, newData]);
expect(emissions2).toEqual([initialData, newData]);
});
test("subscription during an aborted update should receive the last value", async () => {
// Seed with interesting data
const initialData = { date: new Date(2020, 1, 1) };
await globalState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(globalState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const val = await globalState.update(
() => {
return newData;
},
{
shouldUpdate: () => {
emissions2 = trackEmissions(globalState.state$);
return false;
},
},
);
await awaitAsync();
expect(val).toEqual(initialData);
expect(emissions).toEqual([initialData]);
expect(emissions2).toEqual([initialData]);
});
test("updates should wait until previous update is complete", async () => {
trackEmissions(globalState.state$);
await awaitAsync(); // storage updates are behind a promise
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async () => {
let resolved = false;
await Promise.race([
globalState.update(() => {
// deadlocks
resolved = true;
return newData;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(false);
})
.mockImplementation(originalSave);
await globalState.update((state) => {
return newData;
});
});
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
expect(globalState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
const val = await globalState.update((state) => {
return newData;
});
expect(val).toEqual(newData);
const call = diskStorageService.mock.save.mock.calls[0];
expect(call[0]).toEqual("global_fake_fake");
expect(call[1]).toEqual(newData);
});
});
describe("cleanup", () => {
async function assertClean() {
const emissions = trackEmissions(globalState["stateSubject"]);
const initial = structuredClone(emissions);
diskStorageService.save(globalKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(emissions).toEqual(initial); // no longer listening to storage updates
}
it("should cleanup after last subscriber", async () => {
const subscription = globalState.state$.subscribe();
await awaitAsync(); // storage updates are behind a promise
subscription.unsubscribe();
expect(globalState["subscriberCount"].getValue()).toBe(0);
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("should not cleanup if there are still subscribers", async () => {
const subscription1 = globalState.state$.subscribe();
const sub2Emissions: TestState[] = [];
const subscription2 = globalState.state$.subscribe((v) => sub2Emissions.push(v));
await awaitAsync(); // storage updates are behind a promise
subscription1.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
expect(globalState["subscriberCount"].getValue()).toBe(1);
// Still be listening to storage updates
diskStorageService.save(globalKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(sub2Emissions).toEqual([null, newData]);
subscription2.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("can re-initialize after cleanup", async () => {
const subscription = globalState.state$.subscribe();
await awaitAsync();
subscription.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
const emissions = trackEmissions(globalState.state$);
await awaitAsync();
diskStorageService.save(globalKey, newData);
await awaitAsync();
expect(emissions).toEqual([null, newData]);
});
it("should not cleanup if a subscriber joins during the cleanup delay", async () => {
const subscription = globalState.state$.subscribe();
await awaitAsync();
await diskStorageService.save(globalKey, newData);
await awaitAsync();
subscription.unsubscribe();
expect(globalState["subscriberCount"].getValue()).toBe(0);
// Do not wait long enough for cleanup
await awaitAsync(cleanupDelayMs / 2);
expect(globalState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
expect(globalState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
});
});
}); });

View File

@ -1,10 +1,12 @@
import { import {
BehaviorSubject, BehaviorSubject,
Observable, Observable,
Subscription, defer,
filter, filter,
firstValueFrom, firstValueFrom,
shareReplay,
switchMap, switchMap,
tap,
timeout, timeout,
} from "rxjs"; } from "rxjs";
@ -21,25 +23,54 @@ const FAKE_DEFAULT = Symbol("fakeDefault");
export class DefaultGlobalState<T> implements GlobalState<T> { export class DefaultGlobalState<T> implements GlobalState<T> {
private storageKey: string; private storageKey: string;
private updatePromise: Promise<T> | null = null;
private storageUpdateSubscription: Subscription;
private subscriberCount = new BehaviorSubject<number>(0);
private stateObservable: Observable<T>;
protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject< protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
T | typeof FAKE_DEFAULT T | typeof FAKE_DEFAULT
>(FAKE_DEFAULT); >(FAKE_DEFAULT);
get state$() { state$: Observable<T>;
this.stateObservable = this.stateObservable ?? this.initializeObservable();
return this.stateObservable;
}
constructor( constructor(
private keyDefinition: KeyDefinition<T>, private keyDefinition: KeyDefinition<T>,
private chosenLocation: AbstractStorageService & ObservableStorageService, private chosenLocation: AbstractStorageService & ObservableStorageService,
) { ) {
this.storageKey = globalKeyBuilder(this.keyDefinition); this.storageKey = globalKeyBuilder(this.keyDefinition);
const storageUpdates$ = this.chosenLocation.updates$.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}),
shareReplay({ bufferSize: 1, refCount: false }),
);
this.state$ = defer(() => {
const storageUpdateSubscription = storageUpdates$.subscribe((value) => {
this.stateSubject.next(value);
});
this.getFromState().then((s) => {
this.stateSubject.next(s);
});
return this.stateSubject.pipe(
tap({
complete: () => {
storageUpdateSubscription.unsubscribe();
},
}),
);
}).pipe(
shareReplay({ refCount: false, bufferSize: 1 }),
filter<T>((i) => i != FAKE_DEFAULT),
);
} }
async update<TCombine>( async update<TCombine>(
@ -47,24 +78,7 @@ export class DefaultGlobalState<T> implements GlobalState<T> {
options: StateUpdateOptions<T, TCombine> = {}, options: StateUpdateOptions<T, TCombine> = {},
): Promise<T> { ): Promise<T> {
options = populateOptionsWithDefault(options); options = populateOptionsWithDefault(options);
if (this.updatePromise != null) { const currentState = await this.getGuaranteedState();
await this.updatePromise;
}
try {
this.updatePromise = this.internalUpdate(configureState, options);
const newState = await this.updatePromise;
return newState;
} finally {
this.updatePromise = null;
}
}
private async internalUpdate<TCombine>(
configureState: (state: T, dependency: TCombine) => T,
options: StateUpdateOptions<T, TCombine>,
): Promise<T> {
const currentState = await this.getStateForUpdate();
const combinedDependencies = const combinedDependencies =
options.combineLatestWith != null options.combineLatestWith != null
? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout)))
@ -79,86 +93,16 @@ export class DefaultGlobalState<T> implements GlobalState<T> {
return newState; return newState;
} }
private initializeObservable() { private async getGuaranteedState() {
this.storageUpdateSubscription = this.chosenLocation.updates$
.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await this.getFromState();
}),
)
.subscribe((v) => this.stateSubject.next(v));
this.subscriberCount.subscribe((count) => {
if (count === 0 && this.stateObservable != null) {
this.triggerCleanup();
}
});
// Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to
// trigger populating it immediately.
this.getFromState().then((s) => {
this.stateSubject.next(s);
});
return new Observable<T>((subscriber) => {
this.incrementSubscribers();
const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
subscriber.unsubscribe = () => {
this.decrementSubscribers();
prevUnsubscribe();
};
return this.stateSubject
.pipe(
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
filter<T>((i) => i != FAKE_DEFAULT),
)
.subscribe(subscriber);
});
}
/** For use in update methods, does not wait for update to complete before yielding state.
* The expectation is that that await is already done
*/
private async getStateForUpdate() {
const currentValue = this.stateSubject.getValue(); const currentValue = this.stateSubject.getValue();
return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue;
} }
async getFromState(): Promise<T> { async getFromState(): Promise<T> {
if (this.updatePromise != null) {
return await this.updatePromise;
}
return await getStoredValue( return await getStoredValue(
this.storageKey, this.storageKey,
this.chosenLocation, this.chosenLocation,
this.keyDefinition.deserializer, this.keyDefinition.deserializer,
); );
} }
private incrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value + 1);
}
private decrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value - 1);
}
private triggerCleanup() {
setTimeout(() => {
if (this.subscriberCount.value === 0) {
this.updatePromise = null;
this.storageUpdateSubscription.unsubscribe();
this.stateObservable = null;
this.subscriberCount.complete();
this.subscriberCount = new BehaviorSubject<number>(0);
this.stateSubject.next(FAKE_DEFAULT);
}
}, this.keyDefinition.cleanupDelayMs);
}
} }

View File

@ -3,7 +3,6 @@
* @jest-environment ../shared/test.environment.ts * @jest-environment ../shared/test.environment.ts
*/ */
import { anySymbol } from "jest-mock-extended";
import { firstValueFrom, of } from "rxjs"; import { firstValueFrom, of } from "rxjs";
import { Jsonify } from "type-fest"; import { Jsonify } from "type-fest";
@ -31,22 +30,21 @@ class TestState {
} }
const testStateDefinition = new StateDefinition("fake", "disk"); const testStateDefinition = new StateDefinition("fake", "disk");
const cleanupDelayMs = 10;
const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", { const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", {
deserializer: TestState.fromJSON, deserializer: TestState.fromJSON,
cleanupDelayMs,
}); });
const userId = Utils.newGuid() as UserId; const userId = Utils.newGuid() as UserId;
const userKey = userKeyBuilder(userId, testKeyDefinition); const userKey = userKeyBuilder(userId, testKeyDefinition);
describe("DefaultSingleUserState", () => { describe("DefaultSingleUserState", () => {
let diskStorageService: FakeStorageService; let diskStorageService: FakeStorageService;
let userState: DefaultSingleUserState<TestState>; let globalState: DefaultSingleUserState<TestState>;
const newData = { date: new Date() }; const newData = { date: new Date() };
beforeEach(() => { beforeEach(() => {
diskStorageService = new FakeStorageService(); diskStorageService = new FakeStorageService();
userState = new DefaultSingleUserState( globalState = new DefaultSingleUserState(
userId, userId,
testKeyDefinition, testKeyDefinition,
null, // Not testing anything with encrypt service null, // Not testing anything with encrypt service
@ -60,7 +58,7 @@ describe("DefaultSingleUserState", () => {
describe("state$", () => { describe("state$", () => {
it("should emit when storage updates", async () => { it("should emit when storage updates", async () => {
const emissions = trackEmissions(userState.state$); const emissions = trackEmissions(globalState.state$);
await diskStorageService.save(userKey, newData); await diskStorageService.save(userKey, newData);
await awaitAsync(); await awaitAsync();
@ -71,7 +69,7 @@ describe("DefaultSingleUserState", () => {
}); });
it("should not emit when update key does not match", async () => { it("should not emit when update key does not match", async () => {
const emissions = trackEmissions(userState.state$); const emissions = trackEmissions(globalState.state$);
await diskStorageService.save("wrong_key", newData); await diskStorageService.save("wrong_key", newData);
expect(emissions).toHaveLength(0); expect(emissions).toHaveLength(0);
@ -84,7 +82,7 @@ describe("DefaultSingleUserState", () => {
}); });
diskStorageService.internalUpdateStore(initialStorage); diskStorageService.internalUpdateStore(initialStorage);
const state = await firstValueFrom(userState.state$); const state = await firstValueFrom(globalState.state$);
expect(diskStorageService.mock.get).toHaveBeenCalledTimes(1); expect(diskStorageService.mock.get).toHaveBeenCalledTimes(1);
expect(diskStorageService.mock.get).toHaveBeenCalledWith( expect(diskStorageService.mock.get).toHaveBeenCalledWith(
`user_${userId}_fake_fake`, `user_${userId}_fake_fake`,
@ -96,7 +94,7 @@ describe("DefaultSingleUserState", () => {
describe("update", () => { describe("update", () => {
it("should save on update", async () => { it("should save on update", async () => {
const result = await userState.update((state) => { const result = await globalState.update((state) => {
return newData; return newData;
}); });
@ -105,10 +103,10 @@ describe("DefaultSingleUserState", () => {
}); });
it("should emit once per update", async () => { it("should emit once per update", async () => {
const emissions = trackEmissions(userState.state$); const emissions = trackEmissions(globalState.state$);
await awaitAsync(); // storage updates are behind a promise await awaitAsync(); // storage updates are behind a promise
await userState.update((state) => { await globalState.update((state) => {
return newData; return newData;
}); });
@ -121,12 +119,12 @@ describe("DefaultSingleUserState", () => {
}); });
it("should provided combined dependencies", async () => { it("should provided combined dependencies", async () => {
const emissions = trackEmissions(userState.state$); const emissions = trackEmissions(globalState.state$);
await awaitAsync(); // storage updates are behind a promise await awaitAsync(); // storage updates are behind a promise
const combinedDependencies = { date: new Date() }; const combinedDependencies = { date: new Date() };
await userState.update( await globalState.update(
(state, dependencies) => { (state, dependencies) => {
expect(dependencies).toEqual(combinedDependencies); expect(dependencies).toEqual(combinedDependencies);
return newData; return newData;
@ -145,10 +143,9 @@ describe("DefaultSingleUserState", () => {
}); });
it("should not update if shouldUpdate returns false", async () => { it("should not update if shouldUpdate returns false", async () => {
const emissions = trackEmissions(userState.state$); const emissions = trackEmissions(globalState.state$);
await awaitAsync(); // storage updates are behind a promise
const result = await userState.update( const result = await globalState.update(
(state) => { (state) => {
return newData; return newData;
}, },
@ -163,18 +160,18 @@ describe("DefaultSingleUserState", () => {
}); });
it("should provide the update callback with the current State", async () => { it("should provide the update callback with the current State", async () => {
const emissions = trackEmissions(userState.state$); const emissions = trackEmissions(globalState.state$);
await awaitAsync(); // storage updates are behind a promise await awaitAsync(); // storage updates are behind a promise
// Seed with interesting data // Seed with interesting data
const initialData = { date: new Date(2020, 1, 1) }; const initialData = { date: new Date(2020, 1, 1) };
await userState.update((state, dependencies) => { await globalState.update((state, dependencies) => {
return initialData; return initialData;
}); });
await awaitAsync(); await awaitAsync();
await userState.update((state) => { await globalState.update((state) => {
expect(state).toEqual(initialData); expect(state).toEqual(initialData);
return newData; return newData;
}); });
@ -196,14 +193,14 @@ describe("DefaultSingleUserState", () => {
initialStorage[userKey] = initialState; initialStorage[userKey] = initialState;
diskStorageService.internalUpdateStore(initialStorage); diskStorageService.internalUpdateStore(initialStorage);
const emissions = trackEmissions(userState.state$); const emissions = trackEmissions(globalState.state$);
await awaitAsync(); // storage updates are behind a promise await awaitAsync(); // storage updates are behind a promise
const newState = { const newState = {
...initialState, ...initialState,
date: new Date(initialState.date.getFullYear(), initialState.date.getMonth() + 1), date: new Date(initialState.date.getFullYear(), initialState.date.getMonth() + 1),
}; };
const actual = await userState.update((existingState) => newState); const actual = await globalState.update((existingState) => newState);
await awaitAsync(); await awaitAsync();
@ -212,194 +209,4 @@ describe("DefaultSingleUserState", () => {
expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); expect(emissions).toEqual(expect.arrayContaining([initialState, newState]));
}); });
}); });
describe("update races", () => {
test("subscriptions during an update should receive the current and latest data", async () => {
const oldData = { date: new Date(2019, 1, 1) };
await userState.update(() => {
return oldData;
});
const initialData = { date: new Date(2020, 1, 1) };
await userState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => {
emissions2 = trackEmissions(userState.state$);
await originalSave(key, obj);
});
const val = await userState.update(() => {
return newData;
});
await awaitAsync(10);
expect(val).toEqual(newData);
expect(emissions).toEqual([initialData, newData]);
expect(emissions2).toEqual([initialData, newData]);
});
test("subscription during an aborted update should receive the last value", async () => {
// Seed with interesting data
const initialData = { date: new Date(2020, 1, 1) };
await userState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const val = await userState.update(
(state) => {
return newData;
},
{
shouldUpdate: () => {
emissions2 = trackEmissions(userState.state$);
return false;
},
},
);
await awaitAsync();
expect(val).toEqual(initialData);
expect(emissions).toEqual([initialData]);
expect(emissions2).toEqual([initialData]);
});
test("updates should wait until previous update is complete", async () => {
trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async () => {
let resolved = false;
await Promise.race([
userState.update(() => {
// deadlocks
resolved = true;
return newData;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(false);
})
.mockImplementation(originalSave);
await userState.update((state) => {
return newData;
});
});
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
const val = await userState.update((state) => {
return newData;
});
expect(val).toEqual(newData);
const call = diskStorageService.mock.save.mock.calls[0];
expect(call[0]).toEqual(`user_${userId}_fake_fake`);
expect(call[1]).toEqual(newData);
});
});
describe("cleanup", () => {
async function assertClean() {
const emissions = trackEmissions(userState["stateSubject"]);
const initial = structuredClone(emissions);
diskStorageService.save(userKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(emissions).toEqual(initial); // no longer listening to storage updates
}
it("should cleanup after last subscriber", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync(); // storage updates are behind a promise
subscription.unsubscribe();
expect(userState["subscriberCount"].getValue()).toBe(0);
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("should not cleanup if there are still subscribers", async () => {
const subscription1 = userState.state$.subscribe();
const sub2Emissions: TestState[] = [];
const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v));
await awaitAsync(); // storage updates are behind a promise
subscription1.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
expect(userState["subscriberCount"].getValue()).toBe(1);
// Still be listening to storage updates
diskStorageService.save(userKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(sub2Emissions).toEqual([null, newData]);
subscription2.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("can re-initialize after cleanup", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync();
subscription.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
const emissions = trackEmissions(userState.state$);
await awaitAsync();
diskStorageService.save(userKey, newData);
await awaitAsync();
expect(emissions).toEqual([null, newData]);
});
it("should not cleanup if a subscriber joins during the cleanup delay", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync();
await diskStorageService.save(userKey, newData);
await awaitAsync();
subscription.unsubscribe();
expect(userState["subscriberCount"].getValue()).toBe(0);
// Do not wait long enough for cleanup
await awaitAsync(cleanupDelayMs / 2);
expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
});
});
}); });

View File

@ -1,10 +1,12 @@
import { import {
BehaviorSubject, BehaviorSubject,
Observable, Observable,
Subscription, defer,
filter, filter,
firstValueFrom, firstValueFrom,
shareReplay,
switchMap, switchMap,
tap,
timeout, timeout,
} from "rxjs"; } from "rxjs";
@ -21,24 +23,16 @@ import { Converter, SingleUserState } from "../user-state";
import { DefaultDerivedUserState } from "./default-derived-state"; import { DefaultDerivedUserState } from "./default-derived-state";
import { getStoredValue } from "./util"; import { getStoredValue } from "./util";
const FAKE_DEFAULT = Symbol("fakeDefault"); const FAKE_DEFAULT = Symbol("fakeDefault");
export class DefaultSingleUserState<T> implements SingleUserState<T> { export class DefaultSingleUserState<T> implements SingleUserState<T> {
private storageKey: string; private storageKey: string;
private updatePromise: Promise<T> | null = null;
private storageUpdateSubscription: Subscription;
private subscriberCount = new BehaviorSubject<number>(0);
private stateObservable: Observable<T>;
protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject< protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
T | typeof FAKE_DEFAULT T | typeof FAKE_DEFAULT
>(FAKE_DEFAULT); >(FAKE_DEFAULT);
get state$() { state$: Observable<T>;
this.stateObservable = this.stateObservable ?? this.initializeObservable();
return this.stateObservable;
}
constructor( constructor(
readonly userId: UserId, readonly userId: UserId,
@ -47,6 +41,42 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
private chosenLocation: AbstractStorageService & ObservableStorageService, private chosenLocation: AbstractStorageService & ObservableStorageService,
) { ) {
this.storageKey = userKeyBuilder(this.userId, this.keyDefinition); this.storageKey = userKeyBuilder(this.userId, this.keyDefinition);
const storageUpdates$ = this.chosenLocation.updates$.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}),
shareReplay({ bufferSize: 1, refCount: false }),
);
this.state$ = defer(() => {
const storageUpdateSubscription = storageUpdates$.subscribe((value) => {
this.stateSubject.next(value);
});
this.getFromState().then((s) => {
this.stateSubject.next(s);
});
return this.stateSubject.pipe(
tap({
complete: () => {
storageUpdateSubscription.unsubscribe();
},
}),
);
}).pipe(
shareReplay({ refCount: false, bufferSize: 1 }),
filter<T>((i) => i != FAKE_DEFAULT),
);
} }
async update<TCombine>( async update<TCombine>(
@ -54,28 +84,7 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
options: StateUpdateOptions<T, TCombine> = {}, options: StateUpdateOptions<T, TCombine> = {},
): Promise<T> { ): Promise<T> {
options = populateOptionsWithDefault(options); options = populateOptionsWithDefault(options);
if (this.updatePromise != null) { const currentState = await this.getGuaranteedState();
await this.updatePromise;
}
try {
this.updatePromise = this.internalUpdate(configureState, options);
const newState = await this.updatePromise;
return newState;
} finally {
this.updatePromise = null;
}
}
createDerived<TTo>(converter: Converter<T, TTo>): DerivedUserState<TTo> {
return new DefaultDerivedUserState<T, TTo>(converter, this.encryptService, this);
}
private async internalUpdate<TCombine>(
configureState: (state: T, dependency: TCombine) => T,
options: StateUpdateOptions<T, TCombine>,
): Promise<T> {
const currentState = await this.getStateForUpdate();
const combinedDependencies = const combinedDependencies =
options.combineLatestWith != null options.combineLatestWith != null
? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout)))
@ -90,86 +99,20 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
return newState; return newState;
} }
private initializeObservable() { createDerived<TTo>(converter: Converter<T, TTo>): DerivedUserState<TTo> {
this.storageUpdateSubscription = this.chosenLocation.updates$ return new DefaultDerivedUserState<T, TTo>(converter, this.encryptService, this);
.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await this.getFromState();
}),
)
.subscribe((v) => this.stateSubject.next(v));
this.subscriberCount.subscribe((count) => {
if (count === 0 && this.stateObservable != null) {
this.triggerCleanup();
}
});
// Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to
// trigger populating it immediately.
this.getFromState().then((s) => {
this.stateSubject.next(s);
});
return new Observable<T>((subscriber) => {
this.incrementSubscribers();
const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
subscriber.unsubscribe = () => {
this.decrementSubscribers();
prevUnsubscribe();
};
return this.stateSubject
.pipe(
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
filter<T>((i) => i != FAKE_DEFAULT),
)
.subscribe(subscriber);
});
} }
/** For use in update methods, does not wait for update to complete before yielding state. private async getGuaranteedState() {
* The expectation is that that await is already done
*/
private async getStateForUpdate() {
const currentValue = this.stateSubject.getValue(); const currentValue = this.stateSubject.getValue();
return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue;
} }
async getFromState(): Promise<T> { async getFromState(): Promise<T> {
if (this.updatePromise != null) {
return await this.updatePromise;
}
return await getStoredValue( return await getStoredValue(
this.storageKey, this.storageKey,
this.chosenLocation, this.chosenLocation,
this.keyDefinition.deserializer, this.keyDefinition.deserializer,
); );
} }
private incrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value + 1);
}
private decrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value - 1);
}
private triggerCleanup() {
setTimeout(() => {
if (this.subscriberCount.value === 0) {
this.updatePromise = null;
this.storageUpdateSubscription.unsubscribe();
this.stateObservable = null;
this.subscriberCount.complete();
this.subscriberCount = new BehaviorSubject<number>(0);
this.stateSubject.next(FAKE_DEFAULT);
}
}, this.keyDefinition.cleanupDelayMs);
}
} }

View File

@ -18,37 +18,6 @@ describe("KeyDefinition", () => {
}); });
}); });
describe("cleanupDelayMs", () => {
it("defaults to 1000ms", () => {
const keyDefinition = new KeyDefinition<boolean>(fakeStateDefinition, "fake", {
deserializer: (value) => value,
});
expect(keyDefinition).toBeTruthy();
expect(keyDefinition.cleanupDelayMs).toBe(1000);
});
it("can be overridden", () => {
const keyDefinition = new KeyDefinition<boolean>(fakeStateDefinition, "fake", {
deserializer: (value) => value,
cleanupDelayMs: 500,
});
expect(keyDefinition).toBeTruthy();
expect(keyDefinition.cleanupDelayMs).toBe(500);
});
it.each([0, -1])("throws on 0 or negative (%s)", (testValue: number) => {
expect(
() =>
new KeyDefinition<boolean>(fakeStateDefinition, "fake", {
deserializer: (value) => value,
cleanupDelayMs: testValue,
}),
).toThrow();
});
});
describe("record", () => { describe("record", () => {
it("runs custom deserializer for each record value", () => { it("runs custom deserializer for each record value", () => {
const recordDefinition = KeyDefinition.record<boolean>(fakeStateDefinition, "fake", { const recordDefinition = KeyDefinition.record<boolean>(fakeStateDefinition, "fake", {

View File

@ -19,11 +19,6 @@ type KeyDefinitionOptions<T> = {
* @returns The fully typed version of your state. * @returns The fully typed version of your state.
*/ */
readonly deserializer: (jsonValue: Jsonify<T>) => T; readonly deserializer: (jsonValue: Jsonify<T>) => T;
/**
* The number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed.
* Defaults to 1000ms.
*/
readonly cleanupDelayMs?: number;
}; };
/** /**
@ -47,12 +42,8 @@ export class KeyDefinition<T> {
private readonly options: KeyDefinitionOptions<T>, private readonly options: KeyDefinitionOptions<T>,
) { ) {
if (options.deserializer == null) { if (options.deserializer == null) {
throw new Error(`'deserializer' is a required property on key ${this.errorKeyName}`);
}
if (options.cleanupDelayMs <= 0) {
throw new Error( throw new Error(
`'cleanupDelayMs' must be greater than 0. Value of ${options.cleanupDelayMs} passed to key ${this.errorKeyName} `, `'deserializer' is a required property on key ${stateDefinition.name} > ${key}`,
); );
} }
} }
@ -64,13 +55,6 @@ export class KeyDefinition<T> {
return this.options.deserializer; return this.options.deserializer;
} }
/**
* Gets the number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed.
*/
get cleanupDelayMs() {
return this.options.cleanupDelayMs < 0 ? 0 : this.options.cleanupDelayMs ?? 1000;
}
/** /**
* Creates a {@link KeyDefinition} for state that is an array. * Creates a {@link KeyDefinition} for state that is an array.
* @param stateDefinition The state definition to be added to the KeyDefinition * @param stateDefinition The state definition to be added to the KeyDefinition
@ -153,10 +137,6 @@ export class KeyDefinition<T> {
? `${scope}_${userId}_${this.stateDefinition.name}_${this.key}` ? `${scope}_${userId}_${this.stateDefinition.name}_${this.key}`
: `${scope}_${this.stateDefinition.name}_${this.key}`; : `${scope}_${this.stateDefinition.name}_${this.key}`;
} }
private get errorKeyName() {
return `${this.stateDefinition.name} > ${this.key}`;
}
} }
export type StorageKey = Opaque<string, "StorageKey">; export type StorageKey = Opaque<string, "StorageKey">;