fix(core): catch rejected promises in convertNxExecutor (#12627)

This commit is contained in:
Philip Fulcher 2022-10-14 15:35:33 -06:00 committed by GitHub
parent 628c9c0c36
commit 62e91f37af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 58 deletions

View File

@ -48,39 +48,43 @@ function toObservable<T extends { success: boolean }>(
): Observable<T> { ): Observable<T> {
return new (require('rxjs') as typeof import('rxjs')).Observable( return new (require('rxjs') as typeof import('rxjs')).Observable(
(subscriber) => { (subscriber) => {
promiseOrAsyncIterator.then((value) => { promiseOrAsyncIterator
if (!(value as any).next) { .then((value) => {
subscriber.next(value as T); if (!(value as any).next) {
subscriber.complete(); subscriber.next(value as T);
} else { subscriber.complete();
let asyncIterator = value as AsyncIterableIterator<T>; } else {
let asyncIterator = value as AsyncIterableIterator<T>;
function recurse(iterator: AsyncIterableIterator<T>) { function recurse(iterator: AsyncIterableIterator<T>) {
iterator iterator
.next() .next()
.then((result) => { .then((result) => {
if (!result.done) { if (!result.done) {
subscriber.next(result.value);
recurse(iterator);
} else {
if (result.value) {
subscriber.next(result.value); subscriber.next(result.value);
recurse(iterator);
} else {
if (result.value) {
subscriber.next(result.value);
}
subscriber.complete();
} }
subscriber.complete(); })
} .catch((e) => {
}) subscriber.error(e);
.catch((e) => { });
subscriber.error(e); }
});
recurse(asyncIterator);
return () => {
asyncIterator.return();
};
} }
})
recurse(asyncIterator); .catch((err) => {
subscriber.error(err);
return () => { });
asyncIterator.return();
};
}
});
} }
); );
} }

View File

@ -51,39 +51,43 @@ function toObservable<T extends { success: boolean }>(
): Observable<T> { ): Observable<T> {
return new (require('rxjs') as typeof import('rxjs')).Observable( return new (require('rxjs') as typeof import('rxjs')).Observable(
(subscriber) => { (subscriber) => {
promiseOrAsyncIterator.then((value) => { promiseOrAsyncIterator
if (!(value as any).next) { .then((value) => {
subscriber.next(value as T); if (!(value as any).next) {
subscriber.complete(); subscriber.next(value as T);
} else { subscriber.complete();
let asyncIterator = value as AsyncIterableIterator<T>; } else {
let asyncIterator = value as AsyncIterableIterator<T>;
function recurse(iterator: AsyncIterableIterator<T>) { function recurse(iterator: AsyncIterableIterator<T>) {
iterator iterator
.next() .next()
.then((result) => { .then((result) => {
if (!result.done) { if (!result.done) {
subscriber.next(result.value);
recurse(iterator);
} else {
if (result.value) {
subscriber.next(result.value); subscriber.next(result.value);
recurse(iterator);
} else {
if (result.value) {
subscriber.next(result.value);
}
subscriber.complete();
} }
subscriber.complete(); })
} .catch((e) => {
}) subscriber.error(e);
.catch((e) => { });
subscriber.error(e); }
});
recurse(asyncIterator);
return () => {
asyncIterator.return();
};
} }
})
recurse(asyncIterator); .catch((err) => {
subscriber.error(err);
return () => { });
asyncIterator.return();
};
}
});
} }
); );
} }