Effection Logo

Channels

So far our operations have been one-way streets: run, return a value, done. But real applications need ongoing conversations—streams of data flowing between concurrent operations.

  • Messages between workers
  • Events in an internal event bus
  • Data flowing through a pipeline

Think of a Channel like a hallway intercom system. Operations can broadcast messages, and any operation that's listening gets a copy. Effection's Channels make this communication structured and safe.

What's a Channel?

A Channel is a pub/sub system for Effection operations. One operation sends messages, others receive them.

import type { Operation, Channel, Subscription } from "effection";
import { main, createChannel, spawn, sleep } from "effection";

await main(function* () {
  // Create a channel that sends strings
  const channel: Channel<string, void> = createChannel<string, void>();

  // Subscribe to the channel
  // When you yield* a channel, you get a Subscription - your personal message queue
  const subscription: Subscription<string, void> = yield* channel;

  // Send some messages in the background
  yield* spawn(function* (): Operation<void> {
    yield* channel.send("hello");
    yield* sleep(100);
    yield* channel.send("world");
    yield* sleep(100);
    yield* channel.close(); // Close the channel
  });

  // Receive messages
  let result = yield* subscription.next();
  while (!result.done) {
    console.log("Received:", result.value);
    result = yield* subscription.next();
  }

  console.log("Channel closed");
});

Output:

Received: hello
Received: world
Channel closed

Important: Subscribe Before Sending

Channels are not buffered. If no one is subscribed, messages are dropped:

import type { Channel, Subscription } from "effection";
import { main, createChannel } from "effection";

await main(function* () {
  const channel: Channel<string, void> = createChannel<string, void>();

  // Send before subscribing - message is LOST!
  yield* channel.send("this is lost");

  // Now subscribe
  const subscription: Subscription<string, void> = yield* channel;

  yield* channel.send("this is received");

  const result = yield* subscription.next();
  console.log(result.value); // 'this is received'
});

The for yield* each Pattern

Manually calling next() is tedious. Use each for cleaner iteration:

import type { Operation, Channel } from "effection";
import { main, createChannel, spawn, sleep, each } from "effection";

await main(function* () {
  const channel: Channel<number, void> = createChannel<number, void>();

  // Producer
  yield* spawn(function* (): Operation<void> {
    for (let i = 1; i <= 5; i++) {
      yield* channel.send(i);
      yield* sleep(100);
    }
    yield* channel.close();
  });

  // Consumer with each()
  for (const value of yield* each(channel)) {
    console.log("Got:", value);
    yield* each.next(); // REQUIRED!
  }

  console.log("Done");
});

Output:

Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Done

Important: You MUST call yield* each.next() at the end of each loop iteration!

Why yield* each.next()?

This might seem strange. The reason is that it allows you to do async work between receiving a value and requesting the next one:

import type { Operation, Channel } from "effection";
import { main, createChannel, spawn, sleep, each } from "effection";

await main(function* () {
  const channel: Channel<string, void> = createChannel<string, void>();

  yield* spawn(function* (): Operation<void> {
    yield* channel.send("task-1");
    yield* channel.send("task-2");
    yield* channel.send("task-3");
    yield* channel.close();
  });

  for (const task of yield* each(channel)) {
    console.log("Processing:", task);
    yield* sleep(500); // Simulate slow processing
    console.log("Finished:", task);
    yield* each.next(); // Now request next item
  }
});

This gives you backpressure control - you only request the next item when you're ready.

Multiple Subscribers

Channels support multiple subscribers - each gets their own copy of every message:

import type { Operation, Channel } from "effection";
import { main, createChannel, spawn, sleep, each } from "effection";

await main(function* () {
  const channel: Channel<string, void> = createChannel<string, void>();

  // Two subscribers
  yield* spawn(function* (): Operation<void> {
    console.log("Subscriber A starting");
    for (const msg of yield* each(channel)) {
      console.log("A received:", msg);
      yield* each.next();
    }
    console.log("Subscriber A done");
  });

  yield* spawn(function* (): Operation<void> {
    console.log("Subscriber B starting");
    for (const msg of yield* each(channel)) {
      console.log("B received:", msg);
      yield* each.next();
    }
    console.log("Subscriber B done");
  });

  // Give subscribers time to start
  yield* sleep(10);

  // Send messages
  yield* channel.send("hello");
  yield* channel.send("world");
  yield* channel.close();

  yield* sleep(100);
});

Output:

Subscriber A starting
Subscriber B starting
A received: hello
B received: hello
A received: world
B received: world
Subscriber A done
Subscriber B done

Each subscriber has their own queue and receives all messages independently.

Practical Example: Event Bus

Use a channel as an internal event bus:

import type { Operation, Channel } from "effection";
import { main, createChannel, spawn, sleep, each } from "effection";

interface AppEvent {
  type: string;
  payload: unknown;
}

// Create a global event bus
const eventBus: Channel<AppEvent, void> = createChannel<AppEvent, void>();

// Logger that prints all events
function* eventLogger(): Operation<void> {
  for (const event of yield* each(eventBus)) {
    console.log(`[LOG] ${event.type}:`, event.payload);
    yield* each.next();
  }
}

// Analytics that counts events
function* analytics(): Operation<void> {
  const counts: Record<string, number> = {};

  for (const event of yield* each(eventBus)) {
    counts[event.type] = (counts[event.type] || 0) + 1;
    console.log(`[ANALYTICS] Event counts:`, counts);
    yield* each.next();
  }
}

await main(function* () {
  // Start consumers
  yield* spawn(eventLogger);
  yield* spawn(analytics);

  yield* sleep(10);

  // Emit some events
  yield* eventBus.send({ type: "user.login", payload: { userId: 1 } });
  yield* eventBus.send({ type: "page.view", payload: { page: "/home" } });
  yield* eventBus.send({ type: "user.login", payload: { userId: 2 } });

  yield* sleep(100);
});

Output:

[LOG] user.login: { userId: 1 }
[ANALYTICS] Event counts: { 'user.login': 1 }
[LOG] page.view: { page: '/home' }
[ANALYTICS] Event counts: { 'user.login': 1, 'page.view': 1 }
[LOG] user.login: { userId: 2 }
[ANALYTICS] Event counts: { 'user.login': 2, 'page.view': 1 }

But Wait... What About Callbacks?

There's a limitation we haven't addressed:

// This doesn't work!
await main(function* () {
  const channel = createChannel<MouseEvent, void>();

  document.addEventListener("click", (event) => {
    yield * channel.send(event); // SyntaxError! Can't yield* in a callback
  });
});

channel.send() is an operation - you can only call it with yield*. But callbacks are plain JavaScript functions!

This is a fundamental problem:

  • Channels work great when both producer and consumer are Effection operations
  • But external events (DOM clicks, Node.js EventEmitters, timers) come from callbacks

The next chapter introduces Signals - which solve this problem by providing a synchronous send() function that can be called from anywhere.

Key Takeaways

Channels are the hallway intercom for your operations:

  1. Channels are internal pub/sub - structured communication between Effection operations
  2. Subscribe before sending - if nobody's listening, messages vanish into the void
  3. Use for yield* each - cleaner than manual next() calls
  4. Always call yield* each.next() - explicitly request the next message
  5. Multiple subscribers - everyone on the intercom hears every message
  • PreviousResources
  • NextSignals