From c48ddd8ecdcda21fe2fe78fd89d4c8aad921a90f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9C=A8=20Audrey=20=E2=9C=A8?= Date: Mon, 14 Oct 2024 17:54:20 -0400 Subject: [PATCH] move generic rx utilities to common --- libs/common/src/tools/rx.spec.ts | 353 +++++++++++++++++- libs/common/src/tools/rx.ts | 29 ++ libs/tools/generator/core/src/rx.spec.ts | 352 ----------------- libs/tools/generator/core/src/rx.ts | 99 +---- .../services/credential-generator.service.ts | 15 +- 5 files changed, 392 insertions(+), 456 deletions(-) delete mode 100644 libs/tools/generator/core/src/rx.spec.ts diff --git a/libs/common/src/tools/rx.spec.ts b/libs/common/src/tools/rx.spec.ts index 8a2c1e38f5..e2c4281517 100644 --- a/libs/common/src/tools/rx.spec.ts +++ b/libs/common/src/tools/rx.spec.ts @@ -2,11 +2,11 @@ * include structuredClone in test environment. * @jest-environment ../../../../shared/test.environment.ts */ -import { of, firstValueFrom } from "rxjs"; +import { of, firstValueFrom, Subject, tap, EmptyError } from "rxjs"; import { awaitAsync, trackEmissions } from "../../spec"; -import { distinctIfShallowMatch, reduceCollection } from "./rx"; +import { anyComplete, distinctIfShallowMatch, on, ready, reduceCollection } from "./rx"; describe("reduceCollection", () => { it.each([[null], [undefined], [[]]])( @@ -84,3 +84,352 @@ describe("distinctIfShallowMatch", () => { expect(result).toEqual([{ foo: true, bar: true }]); }); }); + +describe("anyComplete", () => { + it("emits true when its input completes", () => { + const input$ = new Subject(); + + const emissions: boolean[] = []; + anyComplete(input$).subscribe((e) => emissions.push(e)); + input$.complete(); + + expect(emissions).toEqual([true]); + }); + + it("completes when its input is already complete", () => { + const input = new Subject(); + input.complete(); + + let completed = false; + anyComplete(input).subscribe({ complete: () => (completed = true) }); + + expect(completed).toBe(true); + }); + + it("completes when any input completes", () => { + const input$ = new Subject(); + const completing$ = new Subject(); + + let completed = false; + anyComplete([input$, completing$]).subscribe({ complete: () => (completed = true) }); + completing$.complete(); + + expect(completed).toBe(true); + }); + + it("ignores emissions", () => { + const input$ = new Subject(); + + const emissions: boolean[] = []; + anyComplete(input$).subscribe((e) => emissions.push(e)); + input$.next(1); + input$.next(2); + input$.complete(); + + expect(emissions).toEqual([true]); + }); + + it("forwards errors", () => { + const input$ = new Subject(); + const expected = { some: "error" }; + + let error = null; + anyComplete(input$).subscribe({ error: (e: unknown) => (error = e) }); + input$.error(expected); + + expect(error).toEqual(expected); + }); +}); + +describe("ready", () => { + it("connects when subscribed", () => { + const watch$ = new Subject(); + let connected = false; + const source$ = new Subject().pipe(tap({ subscribe: () => (connected = true) })); + + // precondition: ready$ should be cold + const ready$ = source$.pipe(ready(watch$)); + expect(connected).toBe(false); + + ready$.subscribe(); + + expect(connected).toBe(true); + }); + + it("suppresses source emissions until its watch emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + // precondition: no emissions + source$.next(1); + expect(results).toEqual([]); + + watch$.next(); + + expect(results).toEqual([1]); + }); + + it("suppresses source emissions until all watches emit", () => { + const watchA$ = new Subject(); + const watchB$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready([watchA$, watchB$])); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + // preconditions: no emissions + source$.next(1); + expect(results).toEqual([]); + watchA$.next(); + expect(results).toEqual([]); + + watchB$.next(); + + expect(results).toEqual([1]); + }); + + it("emits the last source emission when its watch emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + // precondition: no emissions + source$.next(1); + expect(results).toEqual([]); + + source$.next(2); + watch$.next(); + + expect(results).toEqual([2]); + }); + + it("emits all source emissions after its watch emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + watch$.next(); + source$.next(1); + source$.next(2); + + expect(results).toEqual([1, 2]); + }); + + it("ignores repeated watch emissions", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + watch$.next(); + source$.next(1); + watch$.next(); + source$.next(2); + watch$.next(); + + expect(results).toEqual([1, 2]); + }); + + it("completes when its source completes", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + let completed = false; + ready$.subscribe({ complete: () => (completed = true) }); + + source$.complete(); + + expect(completed).toBeTruthy(); + }); + + it("errors when its source errors", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const expected = { some: "error" }; + let error = null; + ready$.subscribe({ error: (e: unknown) => (error = e) }); + + source$.error(expected); + + expect(error).toEqual(expected); + }); + + it("errors when its watch errors", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const expected = { some: "error" }; + let error = null; + ready$.subscribe({ error: (e: unknown) => (error = e) }); + + watch$.error(expected); + + expect(error).toEqual(expected); + }); + + it("errors when its watch completes before emitting", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + let error = null; + ready$.subscribe({ error: (e: unknown) => (error = e) }); + + watch$.complete(); + + expect(error).toBeInstanceOf(EmptyError); + }); +}); + +describe("on", () => { + it("connects when subscribed", () => { + const watch$ = new Subject(); + let connected = false; + const source$ = new Subject().pipe(tap({ subscribe: () => (connected = true) })); + + // precondition: on$ should be cold + const on$ = source$.pipe(on(watch$)); + expect(connected).toBeFalsy(); + + on$.subscribe(); + + expect(connected).toBeTruthy(); + }); + + it("suppresses source emissions until `on` emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + + // precondition: on$ should be cold + source$.next(1); + expect(results).toEqual([]); + + watch$.next(); + + expect(results).toEqual([1]); + }); + + it("repeats source emissions when `on` emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + source$.next(1); + + watch$.next(); + watch$.next(); + + expect(results).toEqual([1, 1]); + }); + + it("updates source emissions when `on` emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + + source$.next(1); + watch$.next(); + source$.next(2); + watch$.next(); + + expect(results).toEqual([1, 2]); + }); + + it("emits a value when `on` emits before the source is ready", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + + watch$.next(); + source$.next(1); + + expect(results).toEqual([1]); + }); + + it("ignores repeated `on` emissions before the source is ready", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + + watch$.next(); + watch$.next(); + source$.next(1); + + expect(results).toEqual([1]); + }); + + it("emits only the latest source emission when `on` emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + source$.next(1); + + watch$.next(); + + source$.next(2); + source$.next(3); + watch$.next(); + + expect(results).toEqual([1, 3]); + }); + + it("completes when its source completes", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + let complete: boolean = false; + source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) }); + + source$.complete(); + + expect(complete).toBeTruthy(); + }); + + it("completes when its watch completes", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + let complete: boolean = false; + source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) }); + + watch$.complete(); + + expect(complete).toBeTruthy(); + }); + + it("errors when its source errors", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const expected = { some: "error" }; + let error = null; + source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) }); + + source$.error(expected); + + expect(error).toEqual(expected); + }); + + it("errors when its watch errors", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const expected = { some: "error" }; + let error = null; + source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) }); + + watch$.error(expected); + + expect(error).toEqual(expected); + }); +}); diff --git a/libs/common/src/tools/rx.ts b/libs/common/src/tools/rx.ts index 42e53da5ab..2b06504304 100644 --- a/libs/common/src/tools/rx.ts +++ b/libs/common/src/tools/rx.ts @@ -14,6 +14,7 @@ import { first, takeUntil, withLatestFrom, + concatMap, } from "rxjs"; /** @@ -130,3 +131,31 @@ export function withLatestReady( ); }); } + +/** + * Create an observable that emits the latest value of the source stream + * when `watch$` emits. If `watch$` emits before the stream emits, then + * an emission occurs as soon as a value becomes ready. + * @param watch$ the observable that triggers emissions + * @returns An observable that emits when `watch$` emits. The observable + * errors if its source stream errors. It also errors if `on` errors. It + * completes if its watch completes. + * + * @remarks This works like `audit`, but it repeats emissions when + * watch$ fires. + */ +export function on(watch$: Observable) { + return pipe( + connect>((source$) => { + const source = new ReplaySubject(1); + source$.subscribe(source); + + return watch$ + .pipe( + ready(source), + concatMap(() => source.pipe(first())), + ) + .pipe(takeUntil(anyComplete(source))); + }), + ); +} diff --git a/libs/tools/generator/core/src/rx.spec.ts b/libs/tools/generator/core/src/rx.spec.ts deleted file mode 100644 index b98e79bb07..0000000000 --- a/libs/tools/generator/core/src/rx.spec.ts +++ /dev/null @@ -1,352 +0,0 @@ -import { EmptyError, Subject, tap } from "rxjs"; - -import { anyComplete, on, ready } from "./rx"; - -describe("anyComplete", () => { - it("emits true when its input completes", () => { - const input$ = new Subject(); - - const emissions: boolean[] = []; - anyComplete(input$).subscribe((e) => emissions.push(e)); - input$.complete(); - - expect(emissions).toEqual([true]); - }); - - it("completes when its input is already complete", () => { - const input = new Subject(); - input.complete(); - - let completed = false; - anyComplete(input).subscribe({ complete: () => (completed = true) }); - - expect(completed).toBe(true); - }); - - it("completes when any input completes", () => { - const input$ = new Subject(); - const completing$ = new Subject(); - - let completed = false; - anyComplete([input$, completing$]).subscribe({ complete: () => (completed = true) }); - completing$.complete(); - - expect(completed).toBe(true); - }); - - it("ignores emissions", () => { - const input$ = new Subject(); - - const emissions: boolean[] = []; - anyComplete(input$).subscribe((e) => emissions.push(e)); - input$.next(1); - input$.next(2); - input$.complete(); - - expect(emissions).toEqual([true]); - }); - - it("forwards errors", () => { - const input$ = new Subject(); - const expected = { some: "error" }; - - let error = null; - anyComplete(input$).subscribe({ error: (e: unknown) => (error = e) }); - input$.error(expected); - - expect(error).toEqual(expected); - }); -}); - -describe("ready", () => { - it("connects when subscribed", () => { - const watch$ = new Subject(); - let connected = false; - const source$ = new Subject().pipe(tap({ subscribe: () => (connected = true) })); - - // precondition: ready$ should be cold - const ready$ = source$.pipe(ready(watch$)); - expect(connected).toBe(false); - - ready$.subscribe(); - - expect(connected).toBe(true); - }); - - it("suppresses source emissions until its watch emits", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const ready$ = source$.pipe(ready(watch$)); - const results: number[] = []; - ready$.subscribe((n) => results.push(n)); - - // precondition: no emissions - source$.next(1); - expect(results).toEqual([]); - - watch$.next(); - - expect(results).toEqual([1]); - }); - - it("suppresses source emissions until all watches emit", () => { - const watchA$ = new Subject(); - const watchB$ = new Subject(); - const source$ = new Subject(); - const ready$ = source$.pipe(ready([watchA$, watchB$])); - const results: number[] = []; - ready$.subscribe((n) => results.push(n)); - - // preconditions: no emissions - source$.next(1); - expect(results).toEqual([]); - watchA$.next(); - expect(results).toEqual([]); - - watchB$.next(); - - expect(results).toEqual([1]); - }); - - it("emits the last source emission when its watch emits", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const ready$ = source$.pipe(ready(watch$)); - const results: number[] = []; - ready$.subscribe((n) => results.push(n)); - - // precondition: no emissions - source$.next(1); - expect(results).toEqual([]); - - source$.next(2); - watch$.next(); - - expect(results).toEqual([2]); - }); - - it("emits all source emissions after its watch emits", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const ready$ = source$.pipe(ready(watch$)); - const results: number[] = []; - ready$.subscribe((n) => results.push(n)); - - watch$.next(); - source$.next(1); - source$.next(2); - - expect(results).toEqual([1, 2]); - }); - - it("ignores repeated watch emissions", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const ready$ = source$.pipe(ready(watch$)); - const results: number[] = []; - ready$.subscribe((n) => results.push(n)); - - watch$.next(); - source$.next(1); - watch$.next(); - source$.next(2); - watch$.next(); - - expect(results).toEqual([1, 2]); - }); - - it("completes when its source completes", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const ready$ = source$.pipe(ready(watch$)); - let completed = false; - ready$.subscribe({ complete: () => (completed = true) }); - - source$.complete(); - - expect(completed).toBeTruthy(); - }); - - it("errors when its source errors", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const ready$ = source$.pipe(ready(watch$)); - const expected = { some: "error" }; - let error = null; - ready$.subscribe({ error: (e: unknown) => (error = e) }); - - source$.error(expected); - - expect(error).toEqual(expected); - }); - - it("errors when its watch errors", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const ready$ = source$.pipe(ready(watch$)); - const expected = { some: "error" }; - let error = null; - ready$.subscribe({ error: (e: unknown) => (error = e) }); - - watch$.error(expected); - - expect(error).toEqual(expected); - }); - - it("errors when its watch completes before emitting", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const ready$ = source$.pipe(ready(watch$)); - let error = null; - ready$.subscribe({ error: (e: unknown) => (error = e) }); - - watch$.complete(); - - expect(error).toBeInstanceOf(EmptyError); - }); -}); - -describe("on", () => { - it("connects when subscribed", () => { - const watch$ = new Subject(); - let connected = false; - const source$ = new Subject().pipe(tap({ subscribe: () => (connected = true) })); - - // precondition: on$ should be cold - const on$ = source$.pipe(on(watch$)); - expect(connected).toBeFalsy(); - - on$.subscribe(); - - expect(connected).toBeTruthy(); - }); - - it("suppresses source emissions until `on` emits", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const results: number[] = []; - source$.pipe(on(watch$)).subscribe((n) => results.push(n)); - - // precondition: on$ should be cold - source$.next(1); - expect(results).toEqual([]); - - watch$.next(); - - expect(results).toEqual([1]); - }); - - it("repeats source emissions when `on` emits", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const results: number[] = []; - source$.pipe(on(watch$)).subscribe((n) => results.push(n)); - source$.next(1); - - watch$.next(); - watch$.next(); - - expect(results).toEqual([1, 1]); - }); - - it("updates source emissions when `on` emits", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const results: number[] = []; - source$.pipe(on(watch$)).subscribe((n) => results.push(n)); - - source$.next(1); - watch$.next(); - source$.next(2); - watch$.next(); - - expect(results).toEqual([1, 2]); - }); - - it("emits a value when `on` emits before the source is ready", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const results: number[] = []; - source$.pipe(on(watch$)).subscribe((n) => results.push(n)); - - watch$.next(); - source$.next(1); - - expect(results).toEqual([1]); - }); - - it("ignores repeated `on` emissions before the source is ready", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const results: number[] = []; - source$.pipe(on(watch$)).subscribe((n) => results.push(n)); - - watch$.next(); - watch$.next(); - source$.next(1); - - expect(results).toEqual([1]); - }); - - it("emits only the latest source emission when `on` emits", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const results: number[] = []; - source$.pipe(on(watch$)).subscribe((n) => results.push(n)); - source$.next(1); - - watch$.next(); - - source$.next(2); - source$.next(3); - watch$.next(); - - expect(results).toEqual([1, 3]); - }); - - it("completes when its source completes", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - let complete: boolean = false; - source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) }); - - source$.complete(); - - expect(complete).toBeTruthy(); - }); - - it("completes when its watch completes", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - let complete: boolean = false; - source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) }); - - watch$.complete(); - - expect(complete).toBeTruthy(); - }); - - it("errors when its source errors", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const expected = { some: "error" }; - let error = null; - source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) }); - - source$.error(expected); - - expect(error).toEqual(expected); - }); - - it("errors when its watch errors", () => { - const watch$ = new Subject(); - const source$ = new Subject(); - const expected = { some: "error" }; - let error = null; - source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) }); - - watch$.error(expected); - - expect(error).toEqual(expected); - }); -}); diff --git a/libs/tools/generator/core/src/rx.ts b/libs/tools/generator/core/src/rx.ts index 851b6cfe7c..070d34d37d 100644 --- a/libs/tools/generator/core/src/rx.ts +++ b/libs/tools/generator/core/src/rx.ts @@ -1,18 +1,4 @@ -import { - concat, - concatMap, - connect, - endWith, - first, - ignoreElements, - map, - Observable, - pipe, - race, - ReplaySubject, - takeUntil, - zip, -} from "rxjs"; +import { map, pipe } from "rxjs"; import { reduceCollection, distinctIfShallowMatch } from "@bitwarden/common/tools/rx"; @@ -51,86 +37,3 @@ export function newDefaultEvaluator() { return pipe(map((_) => new DefaultPolicyEvaluator())); }; } - -/** Create an observable that, once subscribed, emits `true` then completes when - * any input completes. If an input is already complete when the subscription - * occurs, it emits immediately. - * @param watch$ the observable(s) to watch for completion; if an array is passed, - * null and undefined members are ignored. If `watch$` is empty, `anyComplete` - * will never complete. - * @returns An observable that emits `true` when any of its inputs - * complete. The observable forwards the first error from its input. - * @remarks This method is particularly useful in combination with `takeUntil` and - * streams that are not guaranteed to complete on their own. - */ -export function anyComplete(watch$: Observable | Observable[]): Observable { - if (Array.isArray(watch$)) { - const completes$ = watch$ - .filter((w$) => !!w$) - .map((w$) => w$.pipe(ignoreElements(), endWith(true))); - const completed$ = race(completes$); - return completed$; - } else { - return watch$.pipe(ignoreElements(), endWith(true)); - } -} - -/** - * Create an observable that delays the input stream until all watches have - * emitted a value. The watched values are not included in the source stream. - * The last emission from the source is output when all the watches have - * emitted at least once. - * @param watch$ the observable(s) to watch for readiness. If `watch$` is empty, - * `ready` will never emit. - * @returns An observable that emits when the source stream emits. The observable - * errors if one of its watches completes before emitting. It also errors if one - * of its watches errors. - */ -export function ready(watch$: Observable | Observable[]) { - const watching$ = Array.isArray(watch$) ? watch$ : [watch$]; - return pipe( - connect>((source$) => { - // this subscription is safe because `source$` connects only after there - // is an external subscriber. - const source = new ReplaySubject(1); - source$.subscribe(source); - - // `concat` is subscribed immediately after it's returned, at which point - // `zip` blocks until all items in `watching$` are ready. If that occurs - // after `source$` is hot, then the replay subject sends the last-captured - // emission through immediately. Otherwise, `ready` waits for the next - // emission - return concat(zip(watching$).pipe(first(), ignoreElements()), source).pipe( - takeUntil(anyComplete(source)), - ); - }), - ); -} - -/** - * Create an observable that emits the latest value of the source stream - * when `watch$` emits. If `watch$` emits before the stream emits, then - * an emission occurs as soon as a value becomes ready. - * @param watch$ the observable that triggers emissions - * @returns An observable that emits when `watch$` emits. The observable - * errors if its source stream errors. It also errors if `on` errors. It - * completes if its watch completes. - * - * @remarks This works like `audit`, but it repeats emissions when - * watch$ fires. - */ -export function on(watch$: Observable) { - return pipe( - connect>((source$) => { - const source = new ReplaySubject(1); - source$.subscribe(source); - - return watch$ - .pipe( - ready(source), - concatMap(() => source.pipe(first())), - ) - .pipe(takeUntil(anyComplete(source))); - }), - ); -} diff --git a/libs/tools/generator/core/src/services/credential-generator.service.ts b/libs/tools/generator/core/src/services/credential-generator.service.ts index 9b372d98b3..b170cf4c97 100644 --- a/libs/tools/generator/core/src/services/credential-generator.service.ts +++ b/libs/tools/generator/core/src/services/credential-generator.service.ts @@ -303,8 +303,11 @@ export class CredentialGeneratorService { ); // FIXME: enforce policy - const state = this.stateProvider.getUser(userId, PREFERENCES); - const subject = new UserStateSubject(state, { ...dependencies }); + const subject = new UserStateSubject( + PREFERENCES, + (key) => this.stateProvider.getUser(userId, key), + { ...dependencies }, + ); return subject; } @@ -323,10 +326,14 @@ export class CredentialGeneratorService { const userId = await firstValueFrom( dependencies.singleUserId$.pipe(filter((userId) => !!userId)), ); - const state = this.stateProvider.getUser(userId, configuration.settings.account); + const constraints$ = this.policy$(configuration, { userId$: dependencies.singleUserId$ }); - const subject = new UserStateSubject(state, { ...dependencies, constraints$ }); + const subject = new UserStateSubject( + configuration.settings.account, + (key) => this.stateProvider.getUser(userId, key), + { ...dependencies, constraints$ }, + ); return subject; }