refactor(nx): rxjs lettable operators
This commit is contained in:
parent
058c8995f3
commit
715efa4b22
@ -49,7 +49,7 @@
|
||||
"karma-jasmine": "~1.1.0",
|
||||
"karma-webpack": "2.0.4",
|
||||
"prettier": "1.7.4",
|
||||
"rxjs": "5.4.3",
|
||||
"rxjs": "5.5.2",
|
||||
"tslint": "5.7.0",
|
||||
"typescript": "2.5.3"
|
||||
},
|
||||
|
||||
@ -4,10 +4,12 @@
|
||||
"description": "Nrwl Extensions for Angular",
|
||||
"main": "index.js",
|
||||
"types": "index.d.js",
|
||||
"dependencies" :{
|
||||
"dependencies": {
|
||||
"jasmine-marbles": "0.1.0"
|
||||
},
|
||||
"peerDependencies" :{},
|
||||
"peerDependencies": {
|
||||
"rxjs": "^5.5.0"
|
||||
},
|
||||
"author": "Victor Savkin",
|
||||
"license": "MIT"
|
||||
}
|
||||
|
||||
@ -1,5 +1,3 @@
|
||||
import 'rxjs/add/operator/delay';
|
||||
|
||||
import { Component, Injectable } from '@angular/core';
|
||||
import { ComponentFixture, fakeAsync, TestBed, tick } from '@angular/core/testing';
|
||||
import { ActivatedRouteSnapshot, Router } from '@angular/router';
|
||||
@ -11,7 +9,7 @@ import { Store, StoreModule } from '@ngrx/store';
|
||||
import { Observable } from 'rxjs/Observable';
|
||||
import { of } from 'rxjs/observable/of';
|
||||
import { _throw } from 'rxjs/observable/throw';
|
||||
import { delay } from 'rxjs/operator/delay';
|
||||
import { delay } from 'rxjs/operators';
|
||||
import { Subject } from 'rxjs/Subject';
|
||||
|
||||
import { DataPersistence } from '../index';
|
||||
@ -249,7 +247,7 @@ describe('DataPersistence', () => {
|
||||
return of({
|
||||
type: 'TODOS',
|
||||
payload: { user: state.user, todos: 'some todos' }
|
||||
}).delay(1);
|
||||
}).pipe(delay(1));
|
||||
},
|
||||
|
||||
onError: (a, e: any) => null
|
||||
@ -295,7 +293,7 @@ describe('DataPersistence', () => {
|
||||
@Effect()
|
||||
loadTodo = this.s.fetch<GetTodo>('GET_TODO', {
|
||||
id: (a, state) => a.payload.id,
|
||||
run: (a, state) => of({ type: 'TODO', payload: a.payload }).delay(1),
|
||||
run: (a, state) => of({ type: 'TODO', payload: a.payload }).pipe(delay(1)),
|
||||
onError: (a, e: any) => null
|
||||
});
|
||||
|
||||
|
||||
@ -5,14 +5,7 @@ import { ROUTER_NAVIGATION, RouterNavigationAction } from '@ngrx/router-store';
|
||||
import { Action, State, Store } from '@ngrx/store';
|
||||
import { Observable } from 'rxjs/Observable';
|
||||
import { of } from 'rxjs/observable/of';
|
||||
import { _catch } from 'rxjs/operator/catch';
|
||||
import { concatMap } from 'rxjs/operator/concatMap';
|
||||
import { filter } from 'rxjs/operator/filter';
|
||||
import { groupBy } from 'rxjs/operator/groupBy';
|
||||
import { map } from 'rxjs/operator/map';
|
||||
import { mergeMap } from 'rxjs/operator/mergeMap';
|
||||
import { switchMap } from 'rxjs/operator/switchMap';
|
||||
import { withLatestFrom } from 'rxjs/operator/withLatestFrom';
|
||||
import { catchError, concatMap, filter, groupBy, map, mergeMap, switchMap, withLatestFrom } from 'rxjs/operators';
|
||||
|
||||
/**
|
||||
* See {@link DataPersistence.pessimisticUpdate} for more information.
|
||||
@ -46,8 +39,6 @@ export interface HandleNavigationOpts<T> {
|
||||
onError?(a: ActivatedRouteSnapshot, e: any): Observable<any> | any;
|
||||
}
|
||||
|
||||
type Pair = [Action, any];
|
||||
|
||||
/**
|
||||
* @whatItDoes Provides convenience methods for implementing common operations of persisting data.
|
||||
*/
|
||||
@ -91,10 +82,10 @@ export class DataPersistence<T> {
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
pessimisticUpdate<A = Action>(actionType: string, opts: PessimisticUpdateOpts<T, A>): Observable<any> {
|
||||
const nav = this.actions.ofType(actionType);
|
||||
const pairs = withLatestFrom.call(nav, this.store);
|
||||
return concatMap.call(pairs, this.runWithErrorHandling(opts.run, opts.onError));
|
||||
pessimisticUpdate<A extends Action = Action>(actionType: string, opts: PessimisticUpdateOpts<T, A>): Observable<any> {
|
||||
const nav = this.actions.ofType<A>(actionType);
|
||||
const pairs = nav.pipe(withLatestFrom(this.store));
|
||||
return pairs.pipe(concatMap(this.runWithErrorHandling(opts.run, opts.onError)));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -133,10 +124,10 @@ export class DataPersistence<T> {
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
optimisticUpdate<A = Action>(actionType: string, opts: OptimisticUpdateOpts<T, A>): Observable<any> {
|
||||
const nav = this.actions.ofType(actionType);
|
||||
const pairs = withLatestFrom.call(nav, this.store);
|
||||
return concatMap.call(pairs, this.runWithErrorHandling(opts.run, opts.undoAction));
|
||||
optimisticUpdate<A extends Action = Action>(actionType: string, opts: OptimisticUpdateOpts<T, A>): Observable<any> {
|
||||
const nav = this.actions.ofType<A>(actionType);
|
||||
const pairs = nav.pipe(withLatestFrom(this.store));
|
||||
return pairs.pipe(concatMap(this.runWithErrorHandling(opts.run, opts.undoAction)));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -206,17 +197,17 @@ export class DataPersistence<T> {
|
||||
* In addition, if DataPersistence notices that there are multiple requests for Todo 1 scheduled,
|
||||
* it will only run the last one.
|
||||
*/
|
||||
fetch<A = Action>(actionType: string, opts: FetchOpts<T, A>): Observable<any> {
|
||||
const nav = this.actions.ofType(actionType);
|
||||
const allPairs = withLatestFrom.call(nav, this.store);
|
||||
fetch<A extends Action = Action>(actionType: string, opts: FetchOpts<T, A>): Observable<any> {
|
||||
const nav = this.actions.ofType<A>(actionType);
|
||||
const allPairs = nav.pipe(withLatestFrom(this.store));
|
||||
|
||||
if (opts.id) {
|
||||
const groupedFetches: Observable<Observable<Pair>> = groupBy.call(allPairs, p => opts.id(p[0], p[1]));
|
||||
return mergeMap.call(groupedFetches, (pairs: Observable<Pair>) =>
|
||||
switchMap.call(pairs, this.runWithErrorHandling(opts.run, opts.onError))
|
||||
const groupedFetches = allPairs.pipe(groupBy(([action, store]) => opts.id(action, store)));
|
||||
return groupedFetches.pipe(
|
||||
mergeMap(pairs => pairs.pipe(switchMap(this.runWithErrorHandling(opts.run, opts.onError))))
|
||||
);
|
||||
} else {
|
||||
return concatMap.call(allPairs, this.runWithErrorHandling(opts.run, opts.onError));
|
||||
return allPairs.pipe(concatMap(this.runWithErrorHandling(opts.run, opts.onError)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -255,24 +246,21 @@ export class DataPersistence<T> {
|
||||
*
|
||||
*/
|
||||
navigation(component: Type<any>, opts: HandleNavigationOpts<T>): Observable<any> {
|
||||
const nav = filter.call(
|
||||
map.call(this.actions.ofType(ROUTER_NAVIGATION), (a: RouterNavigationAction<RouterStateSnapshot>) =>
|
||||
findSnapshot(component, a.payload.routerState.root)
|
||||
),
|
||||
s => !!s
|
||||
);
|
||||
const nav = this.actions
|
||||
.ofType<RouterNavigationAction<RouterStateSnapshot>>(ROUTER_NAVIGATION)
|
||||
.pipe(map(a => findSnapshot(component, a.payload.routerState.root)), filter(s => !!s));
|
||||
|
||||
const pairs = withLatestFrom.call(nav, this.store);
|
||||
return switchMap.call(pairs, this.runWithErrorHandling(opts.run, opts.onError));
|
||||
const pairs = nav.pipe(withLatestFrom(this.store));
|
||||
return pairs.pipe(switchMap(this.runWithErrorHandling(opts.run, opts.onError)));
|
||||
}
|
||||
|
||||
private runWithErrorHandling(run: any, onError: any) {
|
||||
return a => {
|
||||
private runWithErrorHandling<A, R>(run: (a: A, state?: T) => Observable<R> | R | void, onError: any) {
|
||||
return ([action, state]: [A, T]): Observable<R> => {
|
||||
try {
|
||||
const r = wrapIntoObservable(run(a[0], a[1]));
|
||||
return _catch.call(r, e => wrapIntoObservable(onError(a[0], e)));
|
||||
const r = wrapIntoObservable(run(action, state));
|
||||
return r.pipe(catchError(e => wrapIntoObservable(onError(action, e))));
|
||||
} catch (e) {
|
||||
return wrapIntoObservable(onError(a[0], e));
|
||||
return wrapIntoObservable(onError(action, e));
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -291,12 +279,12 @@ function findSnapshot(component: Type<any>, s: ActivatedRouteSnapshot): Activate
|
||||
return null;
|
||||
}
|
||||
|
||||
function wrapIntoObservable(obj: any): Observable<any> {
|
||||
if (!!obj && typeof obj.subscribe === 'function') {
|
||||
function wrapIntoObservable<O>(obj: Observable<O> | O | void): Observable<O> {
|
||||
if (!!obj && obj instanceof Observable) {
|
||||
return obj;
|
||||
} else if (!obj) {
|
||||
return of();
|
||||
} else {
|
||||
return of(obj);
|
||||
return of(obj as O);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,7 +1,5 @@
|
||||
import { Observable } from 'rxjs/Observable';
|
||||
import { first } from 'rxjs/operator/first';
|
||||
import { toArray } from 'rxjs/operator/toArray';
|
||||
import { toPromise } from 'rxjs/operator/toPromise';
|
||||
import { first, toArray } from 'rxjs/operators';
|
||||
|
||||
/**
|
||||
* @whatItDoes reads all the values from an observable and returns a promise
|
||||
@ -16,7 +14,7 @@ import { toPromise } from 'rxjs/operator/toPromise';
|
||||
* ```
|
||||
*/
|
||||
export function readAll<T>(o: Observable<T>): Promise<T[]> {
|
||||
return toPromise.call(toArray.call(o));
|
||||
return o.pipe(toArray()).toPromise();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -32,5 +30,5 @@ export function readAll<T>(o: Observable<T>): Promise<T[]> {
|
||||
* ```
|
||||
*/
|
||||
export function readFirst<T>(o: Observable<T>): Promise<T> {
|
||||
return toPromise.call(first.call(o));
|
||||
return o.pipe(first()).toPromise();
|
||||
}
|
||||
|
||||
@ -5149,13 +5149,7 @@ run-queue@^1.0.0, run-queue@^1.0.3:
|
||||
dependencies:
|
||||
aproba "^1.1.1"
|
||||
|
||||
rxjs@5.4.3:
|
||||
version "5.4.3"
|
||||
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.4.3.tgz#0758cddee6033d68e0fd53676f0f3596ce3d483f"
|
||||
dependencies:
|
||||
symbol-observable "^1.0.1"
|
||||
|
||||
rxjs@^5.5.2:
|
||||
rxjs@5.5.2, rxjs@^5.5.2:
|
||||
version "5.5.2"
|
||||
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.5.2.tgz#28d403f0071121967f18ad665563255d54236ac3"
|
||||
dependencies:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user