From f287389ecb6cbb85be9303c20222eb70eab42eec Mon Sep 17 00:00:00 2001 From: vsavkin Date: Wed, 4 Oct 2017 11:07:20 -0400 Subject: [PATCH] fix(nx): adds the notion of "entity id" to data persistence --- packages/nx/spec/data-persistence.spec.ts | 64 ++++++++++++++-- packages/nx/src/data-persistence.ts | 92 +++++++++++++++++------ 2 files changed, 130 insertions(+), 26 deletions(-) diff --git a/packages/nx/spec/data-persistence.spec.ts b/packages/nx/spec/data-persistence.spec.ts index 34216988d0..49e1292400 100644 --- a/packages/nx/spec/data-persistence.spec.ts +++ b/packages/nx/spec/data-persistence.spec.ts @@ -1,3 +1,5 @@ +import 'rxjs/add/operator/delay'; + import {Component, Injectable} from '@angular/core'; import {ComponentFixture, fakeAsync, TestBed, tick} from '@angular/core/testing'; import {ActivatedRouteSnapshot, Router} from '@angular/router'; @@ -9,6 +11,7 @@ import {Store, StoreModule} from '@ngrx/store'; import {Observable} from 'rxjs/Observable'; import {of} from 'rxjs/observable/of'; import {_throw} from 'rxjs/observable/throw'; +import {delay} from 'rxjs/operator/delay'; import {Subject} from 'rxjs/Subject'; import {DataPersistence} from '../index'; @@ -212,13 +215,14 @@ describe('DataPersistence', () => { TestBed.configureTestingModule({providers: [DataPersistence]}); }); - describe('successful', () => { + describe('no id', () => { @Injectable() class TodoEffects { @Effect() - loadTodo = this.s.fetch('GET_TODOS', { + loadTodos = this.s.fetch('GET_TODOS', { run(a: any, state: TodosState) { - return ({type: 'TODOS', payload: {user: state.user, todos: 'some todos'}}); + // we need to introduce the delay to "enable" switchMap + return of ({type: 'TODOS', payload: {user: state.user, todos: 'some todos'}}).delay(1); }, onError(a: UpdateTodo, e: any) { @@ -244,10 +248,60 @@ describe('DataPersistence', () => { }); it('should work', async (done) => { - actions = of({type: 'GET_TODOS', payload: {}}); + actions = of({type: 'GET_TODOS', payload: {}}, {type: 'GET_TODOS', payload: {}}); + + expect(await readAll(TestBed.get(TodoEffects).loadTodos)).toEqual([ + {type: 'TODOS', payload: {user: 'bob', todos: 'some todos'}}, + {type: 'TODOS', payload: {user: 'bob', todos: 'some todos'}} + ]); + + done(); + }); + }); + + describe('id', () => { + @Injectable() + class TodoEffects { + @Effect() + loadTodo = this.s.fetch('GET_TODO', { + id(a: any, state: TodosState) { + return a.payload.id; + }, + + run(a: any, state: TodosState) { + // we need to introduce the delay to "enable" switchMap + return of ({type: 'TODO', payload: a.payload}).delay(1); + }, + + onError(a: UpdateTodo, e: any) { + return null; + } + }); + + constructor(private s: DataPersistence) {} + } + + function userReducer() { + return 'bob'; + } + + let actions: Observable; + + beforeEach(() => { + actions = new Subject(); + TestBed.configureTestingModule({ + providers: [TodoEffects, provideMockActions(() => actions)], + imports: [StoreModule.forRoot({user: userReducer})] + }) + }); + + it('should work', async (done) => { + actions = + of({type: 'GET_TODO', payload: {id: 1, value: '1'}}, {type: 'GET_TODO', payload: {id: 2, value: '2a'}}, + {type: 'GET_TODO', payload: {id: 2, value: '2b'}}); expect(await readAll(TestBed.get(TodoEffects).loadTodo)).toEqual([ - {type: 'TODOS', payload: {user: 'bob', todos: 'some todos'}} + {type: 'TODO', payload: {id: 1, value: '1'}}, {type: 'TODO', payload: {id: 2, value: '2b'}} ]); done(); diff --git a/packages/nx/src/data-persistence.ts b/packages/nx/src/data-persistence.ts index fd812fdd50..0f74cd09f7 100644 --- a/packages/nx/src/data-persistence.ts +++ b/packages/nx/src/data-persistence.ts @@ -8,7 +8,9 @@ import {of} from 'rxjs/observable/of'; import {_catch} from 'rxjs/operator/catch'; import {concatMap} from 'rxjs/operator/concatMap'; import {filter} from 'rxjs/operator/filter'; +import {groupBy} from 'rxjs/operator/groupBy'; import {map} from 'rxjs/operator/map'; +import {mergeMap} from 'rxjs/operator/mergeMap'; import {switchMap} from 'rxjs/operator/switchMap'; import {withLatestFrom} from 'rxjs/operator/withLatestFrom'; @@ -31,6 +33,7 @@ export interface OptimisticUpdateOpts { * See {@link DataPersistence.navigation} for more information. */ export interface FetchOpts { + id?(a: Action, state?: any): any; run(a: Action, state?: any): Observable|Action|void; onError?(a: Action, e: any): Observable|any; } @@ -43,8 +46,10 @@ export interface HandleNavigationOpts { onError?(a: ActivatedRouteSnapshot, e: any): Observable|any; } +type Pair = [Action, any]; + /** - * @whatItDoes Provides convenience methods for implementing common operations of talking to the backend. + * @whatItDoes Provides convenience methods for implementing common operations of persisting data. */ @Injectable() export class DataPersistence { @@ -54,11 +59,11 @@ export class DataPersistence { * * @whatItDoes Handles pessimistic updates (updating the server first). * - * It provides the action and the current state. It runs all updates in order by using `concatMap` to prevent race - * conditions. + * Update the server implemented naively suffers from race conditions and poor error handling. + * + * `pessimisticUpdate` addresses these problems--it runs all fetches in order, which removes race conditions + * and forces the developer to handle errors. * - * * `run` callback must return an action or an observable with an action. - * * `onError` is called when a server update was not successful. * ## Example: * * ```typescript @@ -96,12 +101,13 @@ export class DataPersistence { * * @whatItDoes Handles optimistic updates (updating the client first). * - * It provides the action and the current state. It runs all updates in order by using `concatMap` to prevent race - * conditions. + * `optimisticUpdate` addresses these problems--it runs all fetches in order, which removes race conditions + * and forces the developer to handle errors. * - * * `run` callback must return an action or an observable with an action. - * * `undoAction` is called server update was not successful. It must return an action or an observable with an action - * to undo the changes in the client state. + * `optimisticUpdate` is different from `pessimisticUpdate`. In case of a failure, when using `optimisticUpdate`, + * the developer already updated the state locally, so the developer must provide an undo action. + * + * The error handling must be done in the callback, or by means of the undo action. * * ## Example: * @@ -137,17 +143,17 @@ export class DataPersistence { * * @whatItDoes Handles data fetching. * - * It provides the action and the current state. It only runs the last fetch by using `switchMap`. + * Data fetching implemented naively suffers from race conditions and poor error handling. * - * * `run` callback must return an action or an observable with an action. - * * `onError` is called when a server request was not successful. + * `fetch` addresses these problems--it runs all fetches in order, which removes race conditions + * and forces the developer to handle errors. * * ## Example: * * ```typescript * @Injectable() * class TodoEffects { - * @Effect() loadTodo = this.s.fetch('GET_TODOS', { + * @Effect() loadTodos = this.s.fetch('GET_TODOS', { * // provides an action and the current state of the store * run(a: GetTodos, state: TodosState) { * return this.backend(state.user, a.payload).map(r => ({ @@ -165,22 +171,66 @@ export class DataPersistence { * constructor(private s: DataPersistence, private backend: Backend) {} * } * ``` + * + * This is correct, but because it set the concurrency to 1, it may not be performant. + * + * To fix that, you can provide the `id` function, like this: + * + * ```typescript + * @Injectable() + * class TodoEffects { + * @Effect() loadTodo = this.s.fetch('GET_TODO', { + * run(a: GetTodo, state: TodosState) { + * return a.payload.id; + * } + * + * // provides an action and the current state of the store + * run(a: GetTodo, state: TodosState) { + * return this.backend(state.user, a.payload).map(r => ({ + * type: 'TODO', + * payload: r + * }); + * }, + * + * onError(a: GetTodo, e: any): Action { + * // dispatch an undo action to undo the changes in the client state + * // return null; + * } + * }); + * + * constructor(private s: DataPersistence, private backend: Backend) {} + * } + * ``` + * + * With this setup, the requests for Todo 1 will run concurrently with the requests for Todo 2. + * + * In addition, if DataPersistence notices that there are multiple requests for Todo 1 scheduled, + * it will only run the last one. */ fetch(actionType: string, opts: FetchOpts): Observable { const nav = this.actions.ofType(actionType); - const pairs = withLatestFrom.call(nav, this.store); - return switchMap.call(pairs, this.runWithErrorHandling(opts.run, opts.onError)); + const allPairs = withLatestFrom.call(nav, this.store); + + if (opts.id) { + const groupedFetches: Observable> = groupBy.call(allPairs, (p) => opts.id(p[0], p[1])); + return mergeMap.call( + groupedFetches, + (pairs: Observable) => switchMap.call(pairs, this.runWithErrorHandling(opts.run, opts.onError))); + } else { + return concatMap.call(allPairs, this.runWithErrorHandling(opts.run, opts.onError)); + } } /** * @whatItDoes Handles data fetching as part of router navigation. * - * It checks if an activated router state contains the passed in component type, and, if it does, runs the `run` - * callback. It provides the activated snapshot associated with the component and the current state. It only runs the - * last request by using `switchMap`. + * Data fetching implemented naively suffers from race conditions and poor error handling. * - * * `run` callback must return an action or an observable with an action. - * * `onError` is called when a server request was not successful. + * `navigation` addresses these problems. + * + * It checks if an activated router state contains the passed in component type, and, if it does, runs the `run` + * callback. It provides the activated snapshot associated with the component and the current state. And it only runs + * the last request. * * ## Example: *