feat(nx): Allow DataPersistence to take action streams
Currently, DataPersistence methods such as `fetch` and `optimisticUpdate` take a string as their first argument, which they use to filter incoming action types. This can lead to inflexibility in certain cases, such as when you want to filter the action stream before it gets to the DataPersistence handler, or when you want to handle multiple action types with the same effect (as suggested by Mike Ryan in his "Good Action Hygiene with NgRx talk: https://www.youtube.com/watch?v=JmnsEvoy-gY) This PR refactors `optimisticUpdate`, `pessimisticUpdate`, `fetch` and `navigation` into pipeable operators, and implements the existing DataPersistence methods in terms of these operators. This allows users to continue using instance methods and strings, but enables more advanced cases where more control over the action and state streams is needed.
This commit is contained in:
parent
b960480fce
commit
1e9871e6d7
@ -1,17 +1,23 @@
|
||||
import { Component, Injectable } from '@angular/core';
|
||||
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
|
||||
import { Router } from '@angular/router';
|
||||
import { Router, RouterStateSnapshot } from '@angular/router';
|
||||
import { RouterTestingModule } from '@angular/router/testing';
|
||||
import { Actions, Effect, EffectsModule } from '@ngrx/effects';
|
||||
import { provideMockActions } from '@ngrx/effects/testing';
|
||||
import { StoreRouterConnectingModule } from '@ngrx/router-store';
|
||||
import { Store, StoreModule } from '@ngrx/store';
|
||||
import { Observable, of, Subject, throwError } from 'rxjs';
|
||||
import { delay } from 'rxjs/operators';
|
||||
import { delay, withLatestFrom } from 'rxjs/operators';
|
||||
|
||||
import { DataPersistence } from '../index';
|
||||
import { NxModule } from '../src/nx.module';
|
||||
import { readAll } from '../testing';
|
||||
import {
|
||||
FetchOpts,
|
||||
pessimisticUpdate,
|
||||
optimisticUpdate,
|
||||
fetch
|
||||
} from '../src/data-persistence';
|
||||
|
||||
// interfaces
|
||||
type Todo = {
|
||||
@ -97,6 +103,7 @@ describe('DataPersistence', () => {
|
||||
},
|
||||
onError: () => null
|
||||
});
|
||||
|
||||
constructor(private s: DataPersistence<TodosState>) {}
|
||||
}
|
||||
|
||||
@ -241,6 +248,10 @@ describe('DataPersistence', () => {
|
||||
type: 'GET_TODOS';
|
||||
};
|
||||
|
||||
type GetTodo = {
|
||||
type: 'GET_TODO';
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
class TodoEffects {
|
||||
@Effect()
|
||||
@ -256,6 +267,21 @@ describe('DataPersistence', () => {
|
||||
onError: (a, e: any) => null
|
||||
});
|
||||
|
||||
@Effect()
|
||||
loadTodosWithOperator = this.s.actions
|
||||
.ofType<GetTodos>('GET_TODOS')
|
||||
.pipe(
|
||||
withLatestFrom(this.s.store),
|
||||
fetch({
|
||||
run: (action, state) => {
|
||||
return of({
|
||||
type: 'TODOS',
|
||||
payload: { user: state.user, todos: 'some todos' }
|
||||
}).pipe(delay(1));
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
constructor(private s: DataPersistence<TodosState>) {}
|
||||
}
|
||||
|
||||
@ -286,6 +312,22 @@ describe('DataPersistence', () => {
|
||||
|
||||
done();
|
||||
});
|
||||
|
||||
it('should work with an operator', async done => {
|
||||
actions = of(
|
||||
{ type: 'GET_TODOS', payload: {} },
|
||||
{ type: 'GET_TODOS', payload: {} }
|
||||
);
|
||||
|
||||
expect(
|
||||
await readAll(TestBed.get(TodoEffects).loadTodosWithOperator)
|
||||
).toEqual([
|
||||
{ type: 'TODOS', payload: { user: 'bob', todos: 'some todos' } },
|
||||
{ type: 'TODOS', payload: { user: 'bob', todos: 'some todos' } }
|
||||
]);
|
||||
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
describe('id', () => {
|
||||
@ -355,6 +397,20 @@ describe('DataPersistence', () => {
|
||||
onError: (a, e: any) => null
|
||||
});
|
||||
|
||||
@Effect()
|
||||
loadTodoWithOperator = this.s.actions
|
||||
.ofType<UpdateTodo>('UPDATE_TODO')
|
||||
.pipe(
|
||||
withLatestFrom(this.s.store),
|
||||
pessimisticUpdate({
|
||||
run: (a, state) => ({
|
||||
type: 'TODO_UPDATED',
|
||||
payload: { user: state.user, newTitle: a.payload.newTitle }
|
||||
}),
|
||||
onError: (a, e: any) => null
|
||||
})
|
||||
);
|
||||
|
||||
constructor(private s: DataPersistence<TodosState>) {}
|
||||
}
|
||||
|
||||
@ -387,6 +443,24 @@ describe('DataPersistence', () => {
|
||||
|
||||
done();
|
||||
});
|
||||
|
||||
it('should work with an operator', async done => {
|
||||
actions = of({
|
||||
type: 'UPDATE_TODO',
|
||||
payload: { newTitle: 'newTitle' }
|
||||
});
|
||||
|
||||
expect(
|
||||
await readAll(TestBed.get(TodoEffects).loadTodoWithOperator)
|
||||
).toEqual([
|
||||
{
|
||||
type: 'TODO_UPDATED',
|
||||
payload: { user: 'bob', newTitle: 'newTitle' }
|
||||
}
|
||||
]);
|
||||
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
describe('`run` throws an error', () => {
|
||||
@ -504,6 +578,23 @@ describe('DataPersistence', () => {
|
||||
})
|
||||
});
|
||||
|
||||
@Effect()
|
||||
loadTodoWithOperator = this.s.actions
|
||||
.ofType<UpdateTodo>('UPDATE_TODO')
|
||||
.pipe(
|
||||
withLatestFrom(this.s.store),
|
||||
optimisticUpdate({
|
||||
run: (a, state) => {
|
||||
throw new Error('boom');
|
||||
},
|
||||
|
||||
undoAction: (a, e: any) => ({
|
||||
type: 'UNDO_UPDATE_TODO',
|
||||
payload: a.payload
|
||||
})
|
||||
})
|
||||
);
|
||||
|
||||
constructor(private s: DataPersistence<TodosState>) {}
|
||||
}
|
||||
|
||||
@ -534,6 +625,22 @@ describe('DataPersistence', () => {
|
||||
|
||||
done();
|
||||
});
|
||||
|
||||
it('should work with an operator', async done => {
|
||||
actions = of({
|
||||
type: 'UPDATE_TODO',
|
||||
payload: { newTitle: 'newTitle' }
|
||||
});
|
||||
|
||||
const [a]: any = await readAll(
|
||||
TestBed.get(TodoEffects).loadTodoWithOperator
|
||||
);
|
||||
|
||||
expect(a.type).toEqual('UNDO_UPDATE_TODO');
|
||||
expect(a.payload.newTitle).toEqual('newTitle');
|
||||
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,9 +1,13 @@
|
||||
import { Injectable, Type } from '@angular/core';
|
||||
import { ActivatedRouteSnapshot, RouterStateSnapshot } from '@angular/router';
|
||||
import {
|
||||
ActivatedRouteSnapshot,
|
||||
RouterStateSnapshot,
|
||||
RouterState
|
||||
} from '@angular/router';
|
||||
import { Actions } from '@ngrx/effects';
|
||||
import { ROUTER_NAVIGATION, RouterNavigationAction } from '@ngrx/router-store';
|
||||
import { Action, Store } from '@ngrx/store';
|
||||
import { Observable, of } from 'rxjs';
|
||||
import { Observable, of, OperatorFunction } from 'rxjs';
|
||||
import {
|
||||
catchError,
|
||||
concatMap,
|
||||
@ -47,6 +51,135 @@ export interface HandleNavigationOpts<T> {
|
||||
onError?(a: ActivatedRouteSnapshot, e: any): Observable<any> | any;
|
||||
}
|
||||
|
||||
export type ActionOrActionWithState<T, A> = A | [A, T];
|
||||
export type ActionStateStream<T, A> = Observable<ActionOrActionWithState<T, A>>;
|
||||
|
||||
export function pessimisticUpdate<T, A extends Action>(
|
||||
opts: PessimisticUpdateOpts<T, A>
|
||||
) {
|
||||
return (source: ActionStateStream<T, A>): Observable<Action> => {
|
||||
return source.pipe(
|
||||
mapActionAndState(),
|
||||
concatMap(runWithErrorHandling(opts.run, opts.onError))
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
export function optimisticUpdate<T, A extends Action>(
|
||||
opts: OptimisticUpdateOpts<T, A>
|
||||
) {
|
||||
return (source: ActionStateStream<T, A>): Observable<Action> => {
|
||||
return source.pipe(
|
||||
mapActionAndState(),
|
||||
concatMap(runWithErrorHandling(opts.run, opts.undoAction))
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
export function fetch<T, A extends Action>(opts: FetchOpts<T, A>) {
|
||||
return (source: ActionStateStream<T, A>): Observable<Action> => {
|
||||
if (opts.id) {
|
||||
const groupedFetches = source.pipe(
|
||||
mapActionAndState(),
|
||||
groupBy(([action, store]) => {
|
||||
return opts.id(action, store);
|
||||
})
|
||||
);
|
||||
|
||||
return groupedFetches.pipe(
|
||||
mergeMap(pairs =>
|
||||
pairs.pipe(switchMap(runWithErrorHandling(opts.run, opts.onError)))
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return source.pipe(
|
||||
mapActionAndState(),
|
||||
concatMap(runWithErrorHandling(opts.run, opts.onError))
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
export function navigation<T, A extends Action>(
|
||||
component: Type<any>,
|
||||
opts: HandleNavigationOpts<T>
|
||||
) {
|
||||
return (source: ActionStateStream<T, A>) => {
|
||||
const nav = source.pipe(
|
||||
mapActionAndState(),
|
||||
filter(([action, state]) => isStateSnapshot(action)),
|
||||
map(([action, state]) => {
|
||||
if (!isStateSnapshot(action)) {
|
||||
// Because of the above filter we'll never get here,
|
||||
// but this properly type narrows `action`
|
||||
return;
|
||||
}
|
||||
|
||||
return [
|
||||
findSnapshot(component, action.payload.routerState.root),
|
||||
state
|
||||
] as [ActivatedRouteSnapshot, T];
|
||||
}),
|
||||
filter(([snapshot, state]) => !!snapshot)
|
||||
);
|
||||
|
||||
return nav.pipe(switchMap(runWithErrorHandling(opts.run, opts.onError)));
|
||||
};
|
||||
}
|
||||
|
||||
function isStateSnapshot(
|
||||
action: any
|
||||
): action is RouterNavigationAction<RouterStateSnapshot> {
|
||||
return action.type === ROUTER_NAVIGATION;
|
||||
}
|
||||
|
||||
function runWithErrorHandling<T, A, R>(
|
||||
run: (a: A, state?: T) => Observable<R> | R | void,
|
||||
onError: any
|
||||
) {
|
||||
return ([action, state]: [A, T]): Observable<R> => {
|
||||
try {
|
||||
const r = wrapIntoObservable(run(action, state));
|
||||
return r.pipe(catchError(e => wrapIntoObservable(onError(action, e))));
|
||||
} catch (e) {
|
||||
return wrapIntoObservable(onError(action, e));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @whatItDoes maps Observable<Action | [Action, State]> to
|
||||
* Observable<[Action, State]>
|
||||
*/
|
||||
function mapActionAndState<T, A>() {
|
||||
return (source: Observable<ActionOrActionWithState<T, A>>) => {
|
||||
return source.pipe(
|
||||
map(value => {
|
||||
const [action, store] = normalizeActionAndState(value);
|
||||
return [action, store] as [A, T];
|
||||
})
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @whatItDoes Normalizes either a bare action or an array of action and state
|
||||
* into an array of action and state (or undefined)
|
||||
*/
|
||||
function normalizeActionAndState<T, A>(
|
||||
args: ActionOrActionWithState<T, A>
|
||||
): [A, T] {
|
||||
let action: A, state: T;
|
||||
|
||||
if (args instanceof Array) {
|
||||
[action, state] = args;
|
||||
} else {
|
||||
action = args;
|
||||
}
|
||||
|
||||
return [action, state];
|
||||
}
|
||||
|
||||
/**
|
||||
* @whatItDoes Provides convenience methods for implementing common operations of persisting data.
|
||||
*/
|
||||
@ -106,9 +239,7 @@ export class DataPersistence<T> {
|
||||
): Observable<any> {
|
||||
const nav = this.actions.ofType<A>(actionType);
|
||||
const pairs = nav.pipe(withLatestFrom(this.store));
|
||||
return pairs.pipe(
|
||||
concatMap(this.runWithErrorHandling(opts.run, opts.onError))
|
||||
);
|
||||
return pairs.pipe(pessimisticUpdate(opts));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -163,9 +294,7 @@ export class DataPersistence<T> {
|
||||
): Observable<any> {
|
||||
const nav = this.actions.ofType<A>(actionType);
|
||||
const pairs = nav.pipe(withLatestFrom(this.store));
|
||||
return pairs.pipe(
|
||||
concatMap(this.runWithErrorHandling(opts.run, opts.undoAction))
|
||||
);
|
||||
return pairs.pipe(optimisticUpdate(opts));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -242,22 +371,7 @@ export class DataPersistence<T> {
|
||||
const nav = this.actions.ofType<A>(actionType);
|
||||
const allPairs = nav.pipe(withLatestFrom(this.store));
|
||||
|
||||
if (opts.id) {
|
||||
const groupedFetches = allPairs.pipe(
|
||||
groupBy(([action, store]) => opts.id(action, store))
|
||||
);
|
||||
return groupedFetches.pipe(
|
||||
mergeMap(pairs =>
|
||||
pairs.pipe(
|
||||
switchMap(this.runWithErrorHandling(opts.run, opts.onError))
|
||||
)
|
||||
)
|
||||
);
|
||||
} else {
|
||||
return allPairs.pipe(
|
||||
concatMap(this.runWithErrorHandling(opts.run, opts.onError))
|
||||
);
|
||||
}
|
||||
return allPairs.pipe(fetch(opts));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -297,31 +411,12 @@ export class DataPersistence<T> {
|
||||
component: Type<any>,
|
||||
opts: HandleNavigationOpts<T>
|
||||
): Observable<any> {
|
||||
const nav = this.actions
|
||||
.ofType<RouterNavigationAction<RouterStateSnapshot>>(ROUTER_NAVIGATION)
|
||||
.pipe(
|
||||
map(a => findSnapshot(component, a.payload.routerState.root)),
|
||||
filter(s => !!s)
|
||||
);
|
||||
const nav = this.actions.ofType<
|
||||
RouterNavigationAction<RouterStateSnapshot>
|
||||
>(ROUTER_NAVIGATION);
|
||||
|
||||
const pairs = nav.pipe(withLatestFrom(this.store));
|
||||
return pairs.pipe(
|
||||
switchMap(this.runWithErrorHandling(opts.run, opts.onError))
|
||||
);
|
||||
}
|
||||
|
||||
private runWithErrorHandling<A, R>(
|
||||
run: (a: A, state?: T) => Observable<R> | R | void,
|
||||
onError: any
|
||||
) {
|
||||
return ([action, state]: [A, T]): Observable<R> => {
|
||||
try {
|
||||
const r = wrapIntoObservable(run(action, state));
|
||||
return r.pipe(catchError(e => wrapIntoObservable(onError(action, e))));
|
||||
} catch (e) {
|
||||
return wrapIntoObservable(onError(action, e));
|
||||
}
|
||||
};
|
||||
return pairs.pipe(navigation(component, opts));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user