From 1e9871e6d7b1de949ef1ff702708bfefb21e8c1e Mon Sep 17 00:00:00 2001 From: Leigh Caplan Date: Thu, 31 May 2018 15:50:11 -0700 Subject: [PATCH] feat(nx): Allow DataPersistence to take action streams Currently, DataPersistence methods such as `fetch` and `optimisticUpdate` take a string as their first argument, which they use to filter incoming action types. This can lead to inflexibility in certain cases, such as when you want to filter the action stream before it gets to the DataPersistence handler, or when you want to handle multiple action types with the same effect (as suggested by Mike Ryan in his "Good Action Hygiene with NgRx talk: https://www.youtube.com/watch?v=JmnsEvoy-gY) This PR refactors `optimisticUpdate`, `pessimisticUpdate`, `fetch` and `navigation` into pipeable operators, and implements the existing DataPersistence methods in terms of these operators. This allows users to continue using instance methods and strings, but enables more advanced cases where more control over the action and state streams is needed. --- packages/nx/spec/data-persistence.spec.ts | 111 ++++++++++++- packages/nx/src/data-persistence.ts | 189 ++++++++++++++++------ 2 files changed, 251 insertions(+), 49 deletions(-) diff --git a/packages/nx/spec/data-persistence.spec.ts b/packages/nx/spec/data-persistence.spec.ts index 07c1f1a26b..5fa92e9938 100644 --- a/packages/nx/spec/data-persistence.spec.ts +++ b/packages/nx/spec/data-persistence.spec.ts @@ -1,17 +1,23 @@ import { Component, Injectable } from '@angular/core'; import { fakeAsync, TestBed, tick } from '@angular/core/testing'; -import { Router } from '@angular/router'; +import { Router, RouterStateSnapshot } from '@angular/router'; import { RouterTestingModule } from '@angular/router/testing'; import { Actions, Effect, EffectsModule } from '@ngrx/effects'; import { provideMockActions } from '@ngrx/effects/testing'; import { StoreRouterConnectingModule } from '@ngrx/router-store'; import { Store, StoreModule } from '@ngrx/store'; import { Observable, of, Subject, throwError } from 'rxjs'; -import { delay } from 'rxjs/operators'; +import { delay, withLatestFrom } from 'rxjs/operators'; import { DataPersistence } from '../index'; import { NxModule } from '../src/nx.module'; import { readAll } from '../testing'; +import { + FetchOpts, + pessimisticUpdate, + optimisticUpdate, + fetch +} from '../src/data-persistence'; // interfaces type Todo = { @@ -97,6 +103,7 @@ describe('DataPersistence', () => { }, onError: () => null }); + constructor(private s: DataPersistence) {} } @@ -241,6 +248,10 @@ describe('DataPersistence', () => { type: 'GET_TODOS'; }; + type GetTodo = { + type: 'GET_TODO'; + }; + @Injectable() class TodoEffects { @Effect() @@ -256,6 +267,21 @@ describe('DataPersistence', () => { onError: (a, e: any) => null }); + @Effect() + loadTodosWithOperator = this.s.actions + .ofType('GET_TODOS') + .pipe( + withLatestFrom(this.s.store), + fetch({ + run: (action, state) => { + return of({ + type: 'TODOS', + payload: { user: state.user, todos: 'some todos' } + }).pipe(delay(1)); + } + }) + ); + constructor(private s: DataPersistence) {} } @@ -286,6 +312,22 @@ describe('DataPersistence', () => { done(); }); + + it('should work with an operator', async done => { + actions = of( + { type: 'GET_TODOS', payload: {} }, + { type: 'GET_TODOS', payload: {} } + ); + + expect( + await readAll(TestBed.get(TodoEffects).loadTodosWithOperator) + ).toEqual([ + { type: 'TODOS', payload: { user: 'bob', todos: 'some todos' } }, + { type: 'TODOS', payload: { user: 'bob', todos: 'some todos' } } + ]); + + done(); + }); }); describe('id', () => { @@ -355,6 +397,20 @@ describe('DataPersistence', () => { onError: (a, e: any) => null }); + @Effect() + loadTodoWithOperator = this.s.actions + .ofType('UPDATE_TODO') + .pipe( + withLatestFrom(this.s.store), + pessimisticUpdate({ + run: (a, state) => ({ + type: 'TODO_UPDATED', + payload: { user: state.user, newTitle: a.payload.newTitle } + }), + onError: (a, e: any) => null + }) + ); + constructor(private s: DataPersistence) {} } @@ -387,6 +443,24 @@ describe('DataPersistence', () => { done(); }); + + it('should work with an operator', async done => { + actions = of({ + type: 'UPDATE_TODO', + payload: { newTitle: 'newTitle' } + }); + + expect( + await readAll(TestBed.get(TodoEffects).loadTodoWithOperator) + ).toEqual([ + { + type: 'TODO_UPDATED', + payload: { user: 'bob', newTitle: 'newTitle' } + } + ]); + + done(); + }); }); describe('`run` throws an error', () => { @@ -504,6 +578,23 @@ describe('DataPersistence', () => { }) }); + @Effect() + loadTodoWithOperator = this.s.actions + .ofType('UPDATE_TODO') + .pipe( + withLatestFrom(this.s.store), + optimisticUpdate({ + run: (a, state) => { + throw new Error('boom'); + }, + + undoAction: (a, e: any) => ({ + type: 'UNDO_UPDATE_TODO', + payload: a.payload + }) + }) + ); + constructor(private s: DataPersistence) {} } @@ -534,6 +625,22 @@ describe('DataPersistence', () => { done(); }); + + it('should work with an operator', async done => { + actions = of({ + type: 'UPDATE_TODO', + payload: { newTitle: 'newTitle' } + }); + + const [a]: any = await readAll( + TestBed.get(TodoEffects).loadTodoWithOperator + ); + + expect(a.type).toEqual('UNDO_UPDATE_TODO'); + expect(a.payload.newTitle).toEqual('newTitle'); + + done(); + }); }); }); }); diff --git a/packages/nx/src/data-persistence.ts b/packages/nx/src/data-persistence.ts index 6bbdae8057..dc2d1affa5 100644 --- a/packages/nx/src/data-persistence.ts +++ b/packages/nx/src/data-persistence.ts @@ -1,9 +1,13 @@ import { Injectable, Type } from '@angular/core'; -import { ActivatedRouteSnapshot, RouterStateSnapshot } from '@angular/router'; +import { + ActivatedRouteSnapshot, + RouterStateSnapshot, + RouterState +} from '@angular/router'; import { Actions } from '@ngrx/effects'; import { ROUTER_NAVIGATION, RouterNavigationAction } from '@ngrx/router-store'; import { Action, Store } from '@ngrx/store'; -import { Observable, of } from 'rxjs'; +import { Observable, of, OperatorFunction } from 'rxjs'; import { catchError, concatMap, @@ -47,6 +51,135 @@ export interface HandleNavigationOpts { onError?(a: ActivatedRouteSnapshot, e: any): Observable | any; } +export type ActionOrActionWithState = A | [A, T]; +export type ActionStateStream = Observable>; + +export function pessimisticUpdate( + opts: PessimisticUpdateOpts +) { + return (source: ActionStateStream): Observable => { + return source.pipe( + mapActionAndState(), + concatMap(runWithErrorHandling(opts.run, opts.onError)) + ); + }; +} + +export function optimisticUpdate( + opts: OptimisticUpdateOpts +) { + return (source: ActionStateStream): Observable => { + return source.pipe( + mapActionAndState(), + concatMap(runWithErrorHandling(opts.run, opts.undoAction)) + ); + }; +} + +export function fetch(opts: FetchOpts) { + return (source: ActionStateStream): Observable => { + if (opts.id) { + const groupedFetches = source.pipe( + mapActionAndState(), + groupBy(([action, store]) => { + return opts.id(action, store); + }) + ); + + return groupedFetches.pipe( + mergeMap(pairs => + pairs.pipe(switchMap(runWithErrorHandling(opts.run, opts.onError))) + ) + ); + } + + return source.pipe( + mapActionAndState(), + concatMap(runWithErrorHandling(opts.run, opts.onError)) + ); + }; +} + +export function navigation( + component: Type, + opts: HandleNavigationOpts +) { + return (source: ActionStateStream) => { + const nav = source.pipe( + mapActionAndState(), + filter(([action, state]) => isStateSnapshot(action)), + map(([action, state]) => { + if (!isStateSnapshot(action)) { + // Because of the above filter we'll never get here, + // but this properly type narrows `action` + return; + } + + return [ + findSnapshot(component, action.payload.routerState.root), + state + ] as [ActivatedRouteSnapshot, T]; + }), + filter(([snapshot, state]) => !!snapshot) + ); + + return nav.pipe(switchMap(runWithErrorHandling(opts.run, opts.onError))); + }; +} + +function isStateSnapshot( + action: any +): action is RouterNavigationAction { + return action.type === ROUTER_NAVIGATION; +} + +function runWithErrorHandling( + run: (a: A, state?: T) => Observable | R | void, + onError: any +) { + return ([action, state]: [A, T]): Observable => { + try { + const r = wrapIntoObservable(run(action, state)); + return r.pipe(catchError(e => wrapIntoObservable(onError(action, e)))); + } catch (e) { + return wrapIntoObservable(onError(action, e)); + } + }; +} + +/** + * @whatItDoes maps Observable to + * Observable<[Action, State]> + */ +function mapActionAndState() { + return (source: Observable>) => { + return source.pipe( + map(value => { + const [action, store] = normalizeActionAndState(value); + return [action, store] as [A, T]; + }) + ); + }; +} + +/** + * @whatItDoes Normalizes either a bare action or an array of action and state + * into an array of action and state (or undefined) + */ +function normalizeActionAndState( + args: ActionOrActionWithState +): [A, T] { + let action: A, state: T; + + if (args instanceof Array) { + [action, state] = args; + } else { + action = args; + } + + return [action, state]; +} + /** * @whatItDoes Provides convenience methods for implementing common operations of persisting data. */ @@ -106,9 +239,7 @@ export class DataPersistence { ): Observable { const nav = this.actions.ofType(actionType); const pairs = nav.pipe(withLatestFrom(this.store)); - return pairs.pipe( - concatMap(this.runWithErrorHandling(opts.run, opts.onError)) - ); + return pairs.pipe(pessimisticUpdate(opts)); } /** @@ -163,9 +294,7 @@ export class DataPersistence { ): Observable { const nav = this.actions.ofType(actionType); const pairs = nav.pipe(withLatestFrom(this.store)); - return pairs.pipe( - concatMap(this.runWithErrorHandling(opts.run, opts.undoAction)) - ); + return pairs.pipe(optimisticUpdate(opts)); } /** @@ -242,22 +371,7 @@ export class DataPersistence { const nav = this.actions.ofType(actionType); const allPairs = nav.pipe(withLatestFrom(this.store)); - if (opts.id) { - const groupedFetches = allPairs.pipe( - groupBy(([action, store]) => opts.id(action, store)) - ); - return groupedFetches.pipe( - mergeMap(pairs => - pairs.pipe( - switchMap(this.runWithErrorHandling(opts.run, opts.onError)) - ) - ) - ); - } else { - return allPairs.pipe( - concatMap(this.runWithErrorHandling(opts.run, opts.onError)) - ); - } + return allPairs.pipe(fetch(opts)); } /** @@ -297,31 +411,12 @@ export class DataPersistence { component: Type, opts: HandleNavigationOpts ): Observable { - const nav = this.actions - .ofType>(ROUTER_NAVIGATION) - .pipe( - map(a => findSnapshot(component, a.payload.routerState.root)), - filter(s => !!s) - ); + const nav = this.actions.ofType< + RouterNavigationAction + >(ROUTER_NAVIGATION); const pairs = nav.pipe(withLatestFrom(this.store)); - return pairs.pipe( - switchMap(this.runWithErrorHandling(opts.run, opts.onError)) - ); - } - - private runWithErrorHandling( - run: (a: A, state?: T) => Observable | R | void, - onError: any - ) { - return ([action, state]: [A, T]): Observable => { - try { - const r = wrapIntoObservable(run(action, state)); - return r.pipe(catchError(e => wrapIntoObservable(onError(action, e)))); - } catch (e) { - return wrapIntoObservable(onError(action, e)); - } - }; + return pairs.pipe(navigation(component, opts)); } }