mirror of
https://github.com/bitwarden/browser.git
synced 2024-10-22 07:50:04 +02:00
move generic rx utilities to common
This commit is contained in:
parent
273c116749
commit
c48ddd8ecd
@ -2,11 +2,11 @@
|
||||
* include structuredClone in test environment.
|
||||
* @jest-environment ../../../../shared/test.environment.ts
|
||||
*/
|
||||
import { of, firstValueFrom } from "rxjs";
|
||||
import { of, firstValueFrom, Subject, tap, EmptyError } from "rxjs";
|
||||
|
||||
import { awaitAsync, trackEmissions } from "../../spec";
|
||||
|
||||
import { distinctIfShallowMatch, reduceCollection } from "./rx";
|
||||
import { anyComplete, distinctIfShallowMatch, on, ready, reduceCollection } from "./rx";
|
||||
|
||||
describe("reduceCollection", () => {
|
||||
it.each([[null], [undefined], [[]]])(
|
||||
@ -84,3 +84,352 @@ describe("distinctIfShallowMatch", () => {
|
||||
expect(result).toEqual([{ foo: true, bar: true }]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("anyComplete", () => {
|
||||
it("emits true when its input completes", () => {
|
||||
const input$ = new Subject<void>();
|
||||
|
||||
const emissions: boolean[] = [];
|
||||
anyComplete(input$).subscribe((e) => emissions.push(e));
|
||||
input$.complete();
|
||||
|
||||
expect(emissions).toEqual([true]);
|
||||
});
|
||||
|
||||
it("completes when its input is already complete", () => {
|
||||
const input = new Subject<void>();
|
||||
input.complete();
|
||||
|
||||
let completed = false;
|
||||
anyComplete(input).subscribe({ complete: () => (completed = true) });
|
||||
|
||||
expect(completed).toBe(true);
|
||||
});
|
||||
|
||||
it("completes when any input completes", () => {
|
||||
const input$ = new Subject<void>();
|
||||
const completing$ = new Subject<void>();
|
||||
|
||||
let completed = false;
|
||||
anyComplete([input$, completing$]).subscribe({ complete: () => (completed = true) });
|
||||
completing$.complete();
|
||||
|
||||
expect(completed).toBe(true);
|
||||
});
|
||||
|
||||
it("ignores emissions", () => {
|
||||
const input$ = new Subject<number>();
|
||||
|
||||
const emissions: boolean[] = [];
|
||||
anyComplete(input$).subscribe((e) => emissions.push(e));
|
||||
input$.next(1);
|
||||
input$.next(2);
|
||||
input$.complete();
|
||||
|
||||
expect(emissions).toEqual([true]);
|
||||
});
|
||||
|
||||
it("forwards errors", () => {
|
||||
const input$ = new Subject<void>();
|
||||
const expected = { some: "error" };
|
||||
|
||||
let error = null;
|
||||
anyComplete(input$).subscribe({ error: (e: unknown) => (error = e) });
|
||||
input$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ready", () => {
|
||||
it("connects when subscribed", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
let connected = false;
|
||||
const source$ = new Subject<number>().pipe(tap({ subscribe: () => (connected = true) }));
|
||||
|
||||
// precondition: ready$ should be cold
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
expect(connected).toBe(false);
|
||||
|
||||
ready$.subscribe();
|
||||
|
||||
expect(connected).toBe(true);
|
||||
});
|
||||
|
||||
it("suppresses source emissions until its watch emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
// precondition: no emissions
|
||||
source$.next(1);
|
||||
expect(results).toEqual([]);
|
||||
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("suppresses source emissions until all watches emit", () => {
|
||||
const watchA$ = new Subject<void>();
|
||||
const watchB$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready([watchA$, watchB$]));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
// preconditions: no emissions
|
||||
source$.next(1);
|
||||
expect(results).toEqual([]);
|
||||
watchA$.next();
|
||||
expect(results).toEqual([]);
|
||||
|
||||
watchB$.next();
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("emits the last source emission when its watch emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
// precondition: no emissions
|
||||
source$.next(1);
|
||||
expect(results).toEqual([]);
|
||||
|
||||
source$.next(2);
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([2]);
|
||||
});
|
||||
|
||||
it("emits all source emissions after its watch emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
watch$.next();
|
||||
source$.next(1);
|
||||
source$.next(2);
|
||||
|
||||
expect(results).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("ignores repeated watch emissions", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
watch$.next();
|
||||
source$.next(1);
|
||||
watch$.next();
|
||||
source$.next(2);
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("completes when its source completes", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
let completed = false;
|
||||
ready$.subscribe({ complete: () => (completed = true) });
|
||||
|
||||
source$.complete();
|
||||
|
||||
expect(completed).toBeTruthy();
|
||||
});
|
||||
|
||||
it("errors when its source errors", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const expected = { some: "error" };
|
||||
let error = null;
|
||||
ready$.subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
source$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
|
||||
it("errors when its watch errors", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const expected = { some: "error" };
|
||||
let error = null;
|
||||
ready$.subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
watch$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
|
||||
it("errors when its watch completes before emitting", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
let error = null;
|
||||
ready$.subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
watch$.complete();
|
||||
|
||||
expect(error).toBeInstanceOf(EmptyError);
|
||||
});
|
||||
});
|
||||
|
||||
describe("on", () => {
|
||||
it("connects when subscribed", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
let connected = false;
|
||||
const source$ = new Subject<number>().pipe(tap({ subscribe: () => (connected = true) }));
|
||||
|
||||
// precondition: on$ should be cold
|
||||
const on$ = source$.pipe(on(watch$));
|
||||
expect(connected).toBeFalsy();
|
||||
|
||||
on$.subscribe();
|
||||
|
||||
expect(connected).toBeTruthy();
|
||||
});
|
||||
|
||||
it("suppresses source emissions until `on` emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
|
||||
// precondition: on$ should be cold
|
||||
source$.next(1);
|
||||
expect(results).toEqual([]);
|
||||
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("repeats source emissions when `on` emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
source$.next(1);
|
||||
|
||||
watch$.next();
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1, 1]);
|
||||
});
|
||||
|
||||
it("updates source emissions when `on` emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
|
||||
source$.next(1);
|
||||
watch$.next();
|
||||
source$.next(2);
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("emits a value when `on` emits before the source is ready", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
|
||||
watch$.next();
|
||||
source$.next(1);
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("ignores repeated `on` emissions before the source is ready", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
|
||||
watch$.next();
|
||||
watch$.next();
|
||||
source$.next(1);
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("emits only the latest source emission when `on` emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
source$.next(1);
|
||||
|
||||
watch$.next();
|
||||
|
||||
source$.next(2);
|
||||
source$.next(3);
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1, 3]);
|
||||
});
|
||||
|
||||
it("completes when its source completes", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
let complete: boolean = false;
|
||||
source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) });
|
||||
|
||||
source$.complete();
|
||||
|
||||
expect(complete).toBeTruthy();
|
||||
});
|
||||
|
||||
it("completes when its watch completes", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
let complete: boolean = false;
|
||||
source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) });
|
||||
|
||||
watch$.complete();
|
||||
|
||||
expect(complete).toBeTruthy();
|
||||
});
|
||||
|
||||
it("errors when its source errors", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const expected = { some: "error" };
|
||||
let error = null;
|
||||
source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
source$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
|
||||
it("errors when its watch errors", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const expected = { some: "error" };
|
||||
let error = null;
|
||||
source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
watch$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
});
|
||||
|
@ -14,6 +14,7 @@ import {
|
||||
first,
|
||||
takeUntil,
|
||||
withLatestFrom,
|
||||
concatMap,
|
||||
} from "rxjs";
|
||||
|
||||
/**
|
||||
@ -130,3 +131,31 @@ export function withLatestReady<Source, Watch>(
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an observable that emits the latest value of the source stream
|
||||
* when `watch$` emits. If `watch$` emits before the stream emits, then
|
||||
* an emission occurs as soon as a value becomes ready.
|
||||
* @param watch$ the observable that triggers emissions
|
||||
* @returns An observable that emits when `watch$` emits. The observable
|
||||
* errors if its source stream errors. It also errors if `on` errors. It
|
||||
* completes if its watch completes.
|
||||
*
|
||||
* @remarks This works like `audit`, but it repeats emissions when
|
||||
* watch$ fires.
|
||||
*/
|
||||
export function on<T>(watch$: Observable<any>) {
|
||||
return pipe(
|
||||
connect<T, Observable<T>>((source$) => {
|
||||
const source = new ReplaySubject<T>(1);
|
||||
source$.subscribe(source);
|
||||
|
||||
return watch$
|
||||
.pipe(
|
||||
ready(source),
|
||||
concatMap(() => source.pipe(first())),
|
||||
)
|
||||
.pipe(takeUntil(anyComplete(source)));
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
@ -1,352 +0,0 @@
|
||||
import { EmptyError, Subject, tap } from "rxjs";
|
||||
|
||||
import { anyComplete, on, ready } from "./rx";
|
||||
|
||||
describe("anyComplete", () => {
|
||||
it("emits true when its input completes", () => {
|
||||
const input$ = new Subject<void>();
|
||||
|
||||
const emissions: boolean[] = [];
|
||||
anyComplete(input$).subscribe((e) => emissions.push(e));
|
||||
input$.complete();
|
||||
|
||||
expect(emissions).toEqual([true]);
|
||||
});
|
||||
|
||||
it("completes when its input is already complete", () => {
|
||||
const input = new Subject<void>();
|
||||
input.complete();
|
||||
|
||||
let completed = false;
|
||||
anyComplete(input).subscribe({ complete: () => (completed = true) });
|
||||
|
||||
expect(completed).toBe(true);
|
||||
});
|
||||
|
||||
it("completes when any input completes", () => {
|
||||
const input$ = new Subject<void>();
|
||||
const completing$ = new Subject<void>();
|
||||
|
||||
let completed = false;
|
||||
anyComplete([input$, completing$]).subscribe({ complete: () => (completed = true) });
|
||||
completing$.complete();
|
||||
|
||||
expect(completed).toBe(true);
|
||||
});
|
||||
|
||||
it("ignores emissions", () => {
|
||||
const input$ = new Subject<number>();
|
||||
|
||||
const emissions: boolean[] = [];
|
||||
anyComplete(input$).subscribe((e) => emissions.push(e));
|
||||
input$.next(1);
|
||||
input$.next(2);
|
||||
input$.complete();
|
||||
|
||||
expect(emissions).toEqual([true]);
|
||||
});
|
||||
|
||||
it("forwards errors", () => {
|
||||
const input$ = new Subject<void>();
|
||||
const expected = { some: "error" };
|
||||
|
||||
let error = null;
|
||||
anyComplete(input$).subscribe({ error: (e: unknown) => (error = e) });
|
||||
input$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ready", () => {
|
||||
it("connects when subscribed", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
let connected = false;
|
||||
const source$ = new Subject<number>().pipe(tap({ subscribe: () => (connected = true) }));
|
||||
|
||||
// precondition: ready$ should be cold
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
expect(connected).toBe(false);
|
||||
|
||||
ready$.subscribe();
|
||||
|
||||
expect(connected).toBe(true);
|
||||
});
|
||||
|
||||
it("suppresses source emissions until its watch emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
// precondition: no emissions
|
||||
source$.next(1);
|
||||
expect(results).toEqual([]);
|
||||
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("suppresses source emissions until all watches emit", () => {
|
||||
const watchA$ = new Subject<void>();
|
||||
const watchB$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready([watchA$, watchB$]));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
// preconditions: no emissions
|
||||
source$.next(1);
|
||||
expect(results).toEqual([]);
|
||||
watchA$.next();
|
||||
expect(results).toEqual([]);
|
||||
|
||||
watchB$.next();
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("emits the last source emission when its watch emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
// precondition: no emissions
|
||||
source$.next(1);
|
||||
expect(results).toEqual([]);
|
||||
|
||||
source$.next(2);
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([2]);
|
||||
});
|
||||
|
||||
it("emits all source emissions after its watch emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
watch$.next();
|
||||
source$.next(1);
|
||||
source$.next(2);
|
||||
|
||||
expect(results).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("ignores repeated watch emissions", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const results: number[] = [];
|
||||
ready$.subscribe((n) => results.push(n));
|
||||
|
||||
watch$.next();
|
||||
source$.next(1);
|
||||
watch$.next();
|
||||
source$.next(2);
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("completes when its source completes", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
let completed = false;
|
||||
ready$.subscribe({ complete: () => (completed = true) });
|
||||
|
||||
source$.complete();
|
||||
|
||||
expect(completed).toBeTruthy();
|
||||
});
|
||||
|
||||
it("errors when its source errors", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const expected = { some: "error" };
|
||||
let error = null;
|
||||
ready$.subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
source$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
|
||||
it("errors when its watch errors", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
const expected = { some: "error" };
|
||||
let error = null;
|
||||
ready$.subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
watch$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
|
||||
it("errors when its watch completes before emitting", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const ready$ = source$.pipe(ready(watch$));
|
||||
let error = null;
|
||||
ready$.subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
watch$.complete();
|
||||
|
||||
expect(error).toBeInstanceOf(EmptyError);
|
||||
});
|
||||
});
|
||||
|
||||
describe("on", () => {
|
||||
it("connects when subscribed", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
let connected = false;
|
||||
const source$ = new Subject<number>().pipe(tap({ subscribe: () => (connected = true) }));
|
||||
|
||||
// precondition: on$ should be cold
|
||||
const on$ = source$.pipe(on(watch$));
|
||||
expect(connected).toBeFalsy();
|
||||
|
||||
on$.subscribe();
|
||||
|
||||
expect(connected).toBeTruthy();
|
||||
});
|
||||
|
||||
it("suppresses source emissions until `on` emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
|
||||
// precondition: on$ should be cold
|
||||
source$.next(1);
|
||||
expect(results).toEqual([]);
|
||||
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("repeats source emissions when `on` emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
source$.next(1);
|
||||
|
||||
watch$.next();
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1, 1]);
|
||||
});
|
||||
|
||||
it("updates source emissions when `on` emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
|
||||
source$.next(1);
|
||||
watch$.next();
|
||||
source$.next(2);
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("emits a value when `on` emits before the source is ready", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
|
||||
watch$.next();
|
||||
source$.next(1);
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("ignores repeated `on` emissions before the source is ready", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
|
||||
watch$.next();
|
||||
watch$.next();
|
||||
source$.next(1);
|
||||
|
||||
expect(results).toEqual([1]);
|
||||
});
|
||||
|
||||
it("emits only the latest source emission when `on` emits", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const results: number[] = [];
|
||||
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
|
||||
source$.next(1);
|
||||
|
||||
watch$.next();
|
||||
|
||||
source$.next(2);
|
||||
source$.next(3);
|
||||
watch$.next();
|
||||
|
||||
expect(results).toEqual([1, 3]);
|
||||
});
|
||||
|
||||
it("completes when its source completes", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
let complete: boolean = false;
|
||||
source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) });
|
||||
|
||||
source$.complete();
|
||||
|
||||
expect(complete).toBeTruthy();
|
||||
});
|
||||
|
||||
it("completes when its watch completes", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
let complete: boolean = false;
|
||||
source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) });
|
||||
|
||||
watch$.complete();
|
||||
|
||||
expect(complete).toBeTruthy();
|
||||
});
|
||||
|
||||
it("errors when its source errors", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const expected = { some: "error" };
|
||||
let error = null;
|
||||
source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
source$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
|
||||
it("errors when its watch errors", () => {
|
||||
const watch$ = new Subject<void>();
|
||||
const source$ = new Subject<number>();
|
||||
const expected = { some: "error" };
|
||||
let error = null;
|
||||
source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) });
|
||||
|
||||
watch$.error(expected);
|
||||
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
});
|
@ -1,18 +1,4 @@
|
||||
import {
|
||||
concat,
|
||||
concatMap,
|
||||
connect,
|
||||
endWith,
|
||||
first,
|
||||
ignoreElements,
|
||||
map,
|
||||
Observable,
|
||||
pipe,
|
||||
race,
|
||||
ReplaySubject,
|
||||
takeUntil,
|
||||
zip,
|
||||
} from "rxjs";
|
||||
import { map, pipe } from "rxjs";
|
||||
|
||||
import { reduceCollection, distinctIfShallowMatch } from "@bitwarden/common/tools/rx";
|
||||
|
||||
@ -51,86 +37,3 @@ export function newDefaultEvaluator<Target>() {
|
||||
return pipe(map((_) => new DefaultPolicyEvaluator<Target>()));
|
||||
};
|
||||
}
|
||||
|
||||
/** Create an observable that, once subscribed, emits `true` then completes when
|
||||
* any input completes. If an input is already complete when the subscription
|
||||
* occurs, it emits immediately.
|
||||
* @param watch$ the observable(s) to watch for completion; if an array is passed,
|
||||
* null and undefined members are ignored. If `watch$` is empty, `anyComplete`
|
||||
* will never complete.
|
||||
* @returns An observable that emits `true` when any of its inputs
|
||||
* complete. The observable forwards the first error from its input.
|
||||
* @remarks This method is particularly useful in combination with `takeUntil` and
|
||||
* streams that are not guaranteed to complete on their own.
|
||||
*/
|
||||
export function anyComplete(watch$: Observable<any> | Observable<any>[]): Observable<any> {
|
||||
if (Array.isArray(watch$)) {
|
||||
const completes$ = watch$
|
||||
.filter((w$) => !!w$)
|
||||
.map((w$) => w$.pipe(ignoreElements(), endWith(true)));
|
||||
const completed$ = race(completes$);
|
||||
return completed$;
|
||||
} else {
|
||||
return watch$.pipe(ignoreElements(), endWith(true));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an observable that delays the input stream until all watches have
|
||||
* emitted a value. The watched values are not included in the source stream.
|
||||
* The last emission from the source is output when all the watches have
|
||||
* emitted at least once.
|
||||
* @param watch$ the observable(s) to watch for readiness. If `watch$` is empty,
|
||||
* `ready` will never emit.
|
||||
* @returns An observable that emits when the source stream emits. The observable
|
||||
* errors if one of its watches completes before emitting. It also errors if one
|
||||
* of its watches errors.
|
||||
*/
|
||||
export function ready<T>(watch$: Observable<any> | Observable<any>[]) {
|
||||
const watching$ = Array.isArray(watch$) ? watch$ : [watch$];
|
||||
return pipe(
|
||||
connect<T, Observable<T>>((source$) => {
|
||||
// this subscription is safe because `source$` connects only after there
|
||||
// is an external subscriber.
|
||||
const source = new ReplaySubject<T>(1);
|
||||
source$.subscribe(source);
|
||||
|
||||
// `concat` is subscribed immediately after it's returned, at which point
|
||||
// `zip` blocks until all items in `watching$` are ready. If that occurs
|
||||
// after `source$` is hot, then the replay subject sends the last-captured
|
||||
// emission through immediately. Otherwise, `ready` waits for the next
|
||||
// emission
|
||||
return concat(zip(watching$).pipe(first(), ignoreElements()), source).pipe(
|
||||
takeUntil(anyComplete(source)),
|
||||
);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an observable that emits the latest value of the source stream
|
||||
* when `watch$` emits. If `watch$` emits before the stream emits, then
|
||||
* an emission occurs as soon as a value becomes ready.
|
||||
* @param watch$ the observable that triggers emissions
|
||||
* @returns An observable that emits when `watch$` emits. The observable
|
||||
* errors if its source stream errors. It also errors if `on` errors. It
|
||||
* completes if its watch completes.
|
||||
*
|
||||
* @remarks This works like `audit`, but it repeats emissions when
|
||||
* watch$ fires.
|
||||
*/
|
||||
export function on<T>(watch$: Observable<any>) {
|
||||
return pipe(
|
||||
connect<T, Observable<T>>((source$) => {
|
||||
const source = new ReplaySubject<T>(1);
|
||||
source$.subscribe(source);
|
||||
|
||||
return watch$
|
||||
.pipe(
|
||||
ready(source),
|
||||
concatMap(() => source.pipe(first())),
|
||||
)
|
||||
.pipe(takeUntil(anyComplete(source)));
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
@ -303,8 +303,11 @@ export class CredentialGeneratorService {
|
||||
);
|
||||
|
||||
// FIXME: enforce policy
|
||||
const state = this.stateProvider.getUser(userId, PREFERENCES);
|
||||
const subject = new UserStateSubject(state, { ...dependencies });
|
||||
const subject = new UserStateSubject(
|
||||
PREFERENCES,
|
||||
(key) => this.stateProvider.getUser(userId, key),
|
||||
{ ...dependencies },
|
||||
);
|
||||
|
||||
return subject;
|
||||
}
|
||||
@ -323,10 +326,14 @@ export class CredentialGeneratorService {
|
||||
const userId = await firstValueFrom(
|
||||
dependencies.singleUserId$.pipe(filter((userId) => !!userId)),
|
||||
);
|
||||
const state = this.stateProvider.getUser(userId, configuration.settings.account);
|
||||
|
||||
const constraints$ = this.policy$(configuration, { userId$: dependencies.singleUserId$ });
|
||||
|
||||
const subject = new UserStateSubject(state, { ...dependencies, constraints$ });
|
||||
const subject = new UserStateSubject(
|
||||
configuration.settings.account,
|
||||
(key) => this.stateProvider.getUser(userId, key),
|
||||
{ ...dependencies, constraints$ },
|
||||
);
|
||||
|
||||
return subject;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user