diff --git a/lib/each.ts b/lib/each.ts index 13b99fff7..53cdb6d87 100644 --- a/lib/each.ts +++ b/lib/each.ts @@ -1,6 +1,7 @@ +import { spawn } from "./spawn.ts"; +import { constant } from "./constant.ts"; import { createContext } from "./context.ts"; import { useScope } from "./scope.ts"; -import { spawn } from "./spawn.ts"; import type { Operation, Stream, Subscription } from "./types.ts"; import { withResolvers } from "./with-resolvers.ts"; @@ -28,9 +29,16 @@ import { withResolvers } from "./with-resolvers.ts"; * @param stream - the stream to iterate * @returns an operation to iterate `stream` */ -export function each(stream: Stream): Operation> { +export function each( + enumerable: Stream | Subscription, +): Operation> { return { *[Symbol.iterator]() { + let stream = typeof (enumerable as Subscription).next === + "function" + ? constant(enumerable as Subscription) + : enumerable as Stream; + let scope = yield* useScope(); if (!scope.hasOwn(EachStack)) { scope.set(EachStack, []); diff --git a/test/each.test.ts b/test/each.test.ts index 9a76188d2..9df6265a7 100644 --- a/test/each.test.ts +++ b/test/each.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from "./suite.ts"; import { + createQueue, each, type Operation, resource, @@ -54,6 +55,26 @@ describe("each", () => { }); }); + it("can iterate subscriptions", async () => { + await run(function* () { + let queue = createQueue(); + + queue.add(1); + queue.add(2); + queue.add(3); + queue.close(); + + let items = []; + + for (let num of yield* each(queue)) { + items.push(num); + yield* each.next(); + } + + expect(items).toEqual([1, 2, 3]); + }); + }); + it("handles context correctly if you break out of a loop", async () => { await expect(run(function* () { let seq = sequence("hello world");