Combinators
Spawning tasks manually and waiting for them is fine, but there are patterns so common that Effection provides built-in combinators. These are safer and cleaner alternatives to Promise.all() and Promise.race().
The Problem with Promise.all()
async function fetchData(id: number): Promise<string> {
await new Promise((r) => setTimeout(r, id * 100));
if (id === 2) throw new Error("Fetch failed!");
return `Data ${id}`;
}
async function main(): Promise<void> {
try {
// If id=2 fails, what happens to id=1 and id=3?
const results = await Promise.all([
fetchData(1),
fetchData(2), // This throws!
fetchData(3),
]);
console.log(results);
} catch (error) {
console.log("Caught:", error);
// id=1 and id=3 are still running in the background!
}
}
main();
When one promise fails, Promise.all() rejects immediately - but the other promises keep running! They're dangling.
Effection's all() Combinator
import type { Operation } from "effection";
import { main, all, sleep } from "effection";
function* fetchData(id: number): Operation<string> {
console.log(`Starting fetch ${id}`);
yield* sleep(id * 100);
console.log(`Completed fetch ${id}`);
return `Data ${id}`;
}
await main(function* () {
const results: string[] = yield* all([
fetchData(1),
fetchData(2),
fetchData(3),
]);
console.log(results); // ['Data 1', 'Data 2', 'Data 3']
});
Output:
Starting fetch 1
Starting fetch 2
Starting fetch 3
Completed fetch 1
Completed fetch 2
Completed fetch 3
['Data 1', 'Data 2', 'Data 3']
all() waits for all operations to complete and returns an array of results in order.
all() with Error Handling
When one operation fails, all() halts the others:
import type { Operation } from "effection";
import { main, all, sleep } from "effection";
function* fetchData(id: number): Operation<string> {
console.log(`Starting fetch ${id}`);
yield* sleep(id * 100);
if (id === 2) {
console.log(`Fetch ${id} FAILED`);
throw new Error("Fetch 2 failed!");
}
console.log(`Completed fetch ${id}`);
return `Data ${id}`;
}
await main(function* () {
try {
const results: string[] = yield* all([
fetchData(1),
fetchData(2), // This throws after 200ms
fetchData(3), // This gets halted!
]);
console.log(results);
} catch (error) {
console.log("Caught:", (error as Error).message);
}
});
Output:
Starting fetch 1
Starting fetch 2
Starting fetch 3
Completed fetch 1
Fetch 2 FAILED
Caught: Fetch 2 failed!
Notice that Completed fetch 3 never prints - it was halted when fetch 2 failed.
The Problem with Promise.race()
We covered this in the first chapter, but here's a reminder:
async function sleep(ms: number): Promise<void> {
await new Promise((r) => setTimeout(r, ms));
}
async function main(): Promise<void> {
console.time("total");
await Promise.race([sleep(10), sleep(1000)]);
console.timeEnd("total");
// Process won't exit for 1000ms because the second timer is still running!
}
main();
Effection's race() Combinator
import type { Operation } from "effection";
import { main, race, sleep } from "effection";
await main(function* () {
console.time("total");
yield* race([sleep(10), sleep(1000)]);
console.timeEnd("total"); // ~10ms, and process exits immediately!
});
When one operation wins, race() halts all the losers. No leaked effects!
race() with Return Values
race() returns the value of the winning operation:
import type { Operation } from "effection";
import { main, race, sleep } from "effection";
function* fetchFromAPI(name: string, delay: number): Operation<string> {
console.log(`${name}: starting (${delay}ms)`);
yield* sleep(delay);
console.log(`${name}: completed`);
return `Response from ${name}`;
}
await main(function* () {
const winner: string = yield* race([
fetchFromAPI("fast-api", 100),
fetchFromAPI("slow-api", 500),
]);
console.log("Winner:", winner);
});
Output:
fast-api: starting (100ms)
slow-api: starting (500ms)
fast-api: completed
Winner: Response from fast-api
Notice slow-api: completed never prints - it was halted.
Practical Example: Timeout Pattern
A common pattern is racing an operation against a timeout:
import type { Operation } from "effection";
import { main, race, sleep } from "effection";
class TimeoutError extends Error {
constructor(ms: number) {
super(`Operation timed out after ${ms}ms`);
this.name = "TimeoutError";
}
}
function* timeout<T>(ms: number): Operation<T> {
yield* sleep(ms);
throw new TimeoutError(ms);
}
function* withTimeout<T>(operation: Operation<T>, ms: number): Operation<T> {
return yield* race([operation, timeout<T>(ms)]);
}
// Simulated slow API
function* slowFetch(): Operation<string> {
yield* sleep(5000);
return "data";
}
await main(function* () {
try {
const result: string = yield* withTimeout(slowFetch(), 1000);
console.log("Result:", result);
} catch (error) {
if (error instanceof TimeoutError) {
console.log("Request timed out!");
} else {
throw error;
}
}
});
Output:
Request timed out!
The slow fetch is automatically halted when the timeout fires.
Practical Example: First Successful Response
Sometimes you want to query multiple services and use the first one that succeeds:
import type { Operation } from "effection";
import { main, race, sleep } from "effection";
function* fetchWeather(
service: string,
delay: number,
shouldFail: boolean,
): Operation<string> {
console.log(`${service}: fetching...`);
yield* sleep(delay);
if (shouldFail) {
throw new Error(`${service} failed`);
}
return `Weather from ${service}: Sunny, 72°F`;
}
await main(function* () {
try {
const weather: string = yield* race([
fetchWeather("service-a", 100, true), // Fast but fails
fetchWeather("service-b", 200, false), // Slower but succeeds
fetchWeather("service-c", 300, false), // Slowest
]);
console.log(weather);
} catch (error) {
console.log("All services failed");
}
});
Wait - this doesn't quite work! If service-a fails first, the whole race fails. What we really want is "first to succeed".
Building a firstSuccess Combinator
We can build this ourselves using Signals (covered in the Signals chapter):
import type { Operation, Signal } from "effection";
import { main, spawn, scoped, sleep, createSignal, each } from "effection";
function* firstSuccess<T>(operations: (() => Operation<T>)[]): Operation<T> {
const success: Signal<T, never> = createSignal<T, never>();
return yield* scoped(function* () {
for (const op of operations) {
yield* spawn(function* () {
try {
const value = yield* op();
success.send(value); // No yield* needed - it's synchronous!
} catch {
// Ignore failures
}
});
}
// Wait for the first success - blocks until send() is called
for (const result of yield* each(success)) {
return result; // Got one! Scope halts other tasks automatically
}
throw new Error("All operations failed");
});
}
No polling! The Signal lets tasks communicate directly. When any task succeeds, it sends the result, and we immediately receive it.
Combining all() and spawn()
For complex orchestration, combine these tools:
import type { Operation, Task } from "effection";
import { main, spawn, all, sleep } from "effection";
interface User {
id: number;
name: string;
}
interface Post {
id: number;
title: string;
userId: number;
}
interface Comment {
id: number;
text: string;
postId: number;
}
function* fetchUser(id: number): Operation<User> {
yield* sleep(100);
return { id, name: `User ${id}` };
}
function* fetchPosts(userId: number): Operation<Post[]> {
yield* sleep(150);
return [
{ id: 1, title: "First Post", userId },
{ id: 2, title: "Second Post", userId },
];
}
function* fetchComments(postId: number): Operation<Comment[]> {
yield* sleep(50);
return [
{ id: 1, text: "Great!", postId },
{ id: 2, text: "Thanks!", postId },
];
}
await main(function* () {
console.time("total");
// Fetch user first
const user: User = yield* fetchUser(1);
// Fetch all posts
const posts: Post[] = yield* fetchPosts(user.id);
// Fetch comments for all posts in parallel!
const allComments: Comment[][] = yield* all(
posts.map((post) => fetchComments(post.id)),
);
console.log({
user,
posts,
comments: allComments.flat(),
});
console.timeEnd("total"); // ~300ms total
});
Key Takeaways
all()is likePromise.all()- but halts remaining operations on failurerace()is likePromise.race()- but halts losers, no leaked effects- Combine with
spawn()for complex orchestration - Use
race()for timeouts - race your operation against a timer - No dangling operations - structured concurrency guarantees cleanup