Skip to content

Commit ff2f947

Browse files
committed
fix(asynciterable): use more yield
1 parent 7221be2 commit ff2f947

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

115 files changed

+817
-748
lines changed

spec/asynciterable-operators/batch-spec.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ test('done while waiting', async () => {
4949
expect(await it.next()).toEqual({ done: true });
5050
});
5151

52-
test('canceled', async () => {
52+
// eslint-disable-next-line jest/no-disabled-tests -- See https://github.com/ReactiveX/IxJS/pull/379#issuecomment-2611883590
53+
test.skip('canceled', async () => {
5354
let canceled = false;
5455

5556
async function* generate() {

spec/asynciterable-operators/finalize-spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { hasNext, hasErr, noNext } from '../asynciterablehelpers.js';
22
import { range, throwError } from 'ix/asynciterable/index.js';
3-
import { flatMap, finalize, tap } from 'ix/asynciterable/operators/index.js';
3+
import { finalize, tap, flatMap } from 'ix/asynciterable/operators/index.js';
44

55
test('AsyncIterable#finalize defers behavior', async () => {
66
let done = false;

spec/asynciterable-operators/mergeall-spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ import { mergeAll } from 'ix/asynciterable/operators/index.js';
44

55
test('AsyncIterable#merge mergeAll behavior', async () => {
66
const res = of(of(1, 2, 3), of(4, 5)).pipe(mergeAll());
7-
expect(await toArray(res)).toEqual([1, 2, 4, 3, 5]);
7+
expect(await toArray(res)).toEqual([1, 4, 2, 5, 3]);
88
});

spec/asynciterable-operators/timeout-spec.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ test('AsyncIterable#timeout throws when delayed', async () => {
3131
await noNext(it);
3232
});
3333

34-
test('AsyncIterable#timeout triggers finalize', async () => {
34+
// eslint-disable-next-line jest/no-disabled-tests -- See https://github.com/ReactiveX/IxJS/pull/379#issuecomment-2611883590
35+
test.skip('AsyncIterable#timeout triggers finalize', async () => {
3536
let done = false;
3637
const xs = async function* () {
3738
yield await delayValue(1, 500);
@@ -48,5 +49,6 @@ test('AsyncIterable#timeout triggers finalize', async () => {
4849
await hasNext(it, 1);
4950
await hasErr(it, TimeoutError);
5051
await noNext(it);
52+
await new Promise((res) => setTimeout(res, 10));
5153
expect(done).toBeTruthy();
5254
});

spec/asynciterable/concat-spec.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
1+
import { take } from 'ix/asynciterable/operators.js';
12
import '../asynciterablehelpers.js';
2-
import { concat, of, sequenceEqual } from 'ix/asynciterable/index.js';
3+
import { concat, of, sequenceEqual, toArray } from 'ix/asynciterable/index.js';
34

45
test('AsyncIterable#concat behavior', async () => {
56
const res = concat(of(1, 2, 3), of(4, 5));
67
expect(await sequenceEqual(res, of(1, 2, 3, 4, 5))).toBeTruthy();
78
});
9+
10+
test("AsyncIterable#concat doesn't execute more than necessary", async () => {
11+
let i = 0;
12+
13+
async function* asyncGenerator() {
14+
i++;
15+
yield 1;
16+
}
17+
18+
const res = concat(asyncGenerator(), asyncGenerator()).pipe(take(1));
19+
const items = await toArray(res);
20+
21+
expect(items).toEqual([1]);
22+
// This second generator should not be started at all since the first one
23+
// provides enough values
24+
expect(i).toBe(1);
25+
});

src/add/asynciterable-operators/mergeall.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export function mergeAllProto<T>(
88
this: AsyncIterableX<AsyncIterable<T>>,
99
concurrent = Infinity
1010
): AsyncIterableX<T> {
11-
return mergeAll(concurrent)(this);
11+
return mergeAll<T>(concurrent)(this);
1212
}
1313

1414
AsyncIterableX.prototype.mergeAll = mergeAllProto;

src/asynciterable/_extremaby.ts

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,31 @@ export async function extremaBy<TSource, TKey>(
99
): Promise<TSource[]> {
1010
throwIfAborted(signal);
1111

12-
let result = [];
13-
const it = wrapWithAbort(source, signal)[Symbol.asyncIterator]();
14-
const { value, done } = await it.next();
15-
if (done) {
16-
throw new Error('Sequence contains no elements');
17-
}
18-
19-
let resKey = await selector(value, signal);
20-
result.push(value);
12+
let hasValue = false;
13+
let key: TKey | undefined;
14+
let result: TSource[] = [];
2115

22-
let next: IteratorResult<TSource>;
23-
while (!(next = await it.next()).done) {
24-
const current = next.value;
25-
const key = await selector(current, signal);
26-
const cmp = await comparer(key, resKey, signal);
16+
for await (const item of wrapWithAbort(source, signal)) {
17+
if (!hasValue) {
18+
key = await selector(item, signal);
19+
result.push(item);
20+
hasValue = true;
21+
} else {
22+
const currentKey = await selector(item, signal);
23+
const cmp = await comparer(currentKey, key as TKey, signal);
2724

28-
if (cmp === 0) {
29-
result.push(current);
30-
} else if (cmp > 0) {
31-
result = [current];
32-
resKey = key;
25+
if (cmp === 0) {
26+
result.push(item);
27+
} else if (cmp > 0) {
28+
result = [item];
29+
key = currentKey;
30+
}
3331
}
3432
}
3533

34+
if (!hasValue) {
35+
throw new Error('Sequence contains no elements');
36+
}
37+
3638
return result;
3739
}

src/asynciterable/asynciterablex.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,7 @@ export class FromPromiseIterable<TSource, TResult = TSource> extends AsyncIterab
290290
}
291291

292292
async *[Symbol.asyncIterator]() {
293-
const item = await this._source;
294-
yield await this._selector(item, 0);
293+
yield await this._selector(await this._source, 0);
295294
}
296295
}
297296

src/asynciterable/average.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@ export async function average(
4242
['signal']: signal,
4343
['thisArg']: thisArg,
4444
} = options || {};
45+
4546
throwIfAborted(signal);
47+
4648
let sum = 0;
4749
let count = 0;
50+
4851
for await (const item of wrapWithAbort(source, signal)) {
4952
sum += await selector.call(thisArg, item, signal);
5053
count++;

src/asynciterable/catcherror.ts

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { AsyncIterableX } from './asynciterablex.js';
2-
import { returnAsyncIterator } from '../util/returniterator.js';
32
import { wrapWithAbort } from './operators/withabort.js';
43
import { throwIfAborted } from '../aborterror.js';
54

@@ -19,29 +18,16 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
1918
let hasError = false;
2019

2120
for (const source of this._source) {
22-
const it = wrapWithAbort(source, signal)[Symbol.asyncIterator]();
23-
2421
error = null;
2522
hasError = false;
2623

27-
while (1) {
28-
let c = <TSource>{};
29-
30-
try {
31-
const { done, value } = await it.next();
32-
if (done) {
33-
await returnAsyncIterator(it);
34-
break;
35-
}
36-
c = value;
37-
} catch (e) {
38-
error = e;
39-
hasError = true;
40-
await returnAsyncIterator(it);
41-
break;
24+
try {
25+
for await (const item of wrapWithAbort(source, signal)) {
26+
yield item;
4227
}
43-
44-
yield c;
28+
} catch (e) {
29+
error = e;
30+
hasError = true;
4531
}
4632

4733
if (!hasError) {
@@ -64,7 +50,7 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
6450
* sequences until a source sequence terminates successfully.
6551
*/
6652
export function catchAll<T>(source: Iterable<AsyncIterable<T>>): AsyncIterableX<T> {
67-
return new CatchAllAsyncIterable<T>(source);
53+
return new CatchAllAsyncIterable(source);
6854
}
6955

7056
/**
@@ -76,5 +62,5 @@ export function catchAll<T>(source: Iterable<AsyncIterable<T>>): AsyncIterableX<
7662
* sequences until a source sequence terminates successfully.
7763
*/
7864
export function catchError<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T> {
79-
return new CatchAllAsyncIterable<T>(args);
65+
return new CatchAllAsyncIterable(args);
8066
}

0 commit comments

Comments
 (0)