EventBus
The @daiso-tech/core/event-bus component provides a way for dispatching and listening to events independent of underlying technology.
Initial configuration
To begin using the EventBus class, you'll need to create and configure an instance:
import { MemoryEventBusAdapter } from "@daiso-tech/core/event-bus/memory-event-bus-adapter";
import type { IEventBus } from "@daiso-tech/core/event-bus/contracts";
import { EventBus } from "@daiso-tech/core/event-bus";
const eventBus: IEventBus = new EventBus({
// You can choose the adapter to use
adapter: new MemoryEventBusAdapter(),
});
Here is a complete list of settings for the EventBus class.
Event handling basics
Registering Listeners and Dispatching Events
Event listeners can be added to respond to specific events:
await eventBus.addListener("add", (event) => {
console.log(event);
});
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
Note EventBus class instance uses Task instead of a regular Promise. This means you must either await the Task or call its detach method to run it.
Refer to the @daiso-tech/core/task documentation for further information.
Listener management
To properly remove a listener, you must use a named function:
import type { BaseEvent } from "@daiso-tech/core/event-bus/contracts";
const listener = (event: BaseEvent) => {
console.log(event);
};
await eventBus.addListener("add", listener);
await eventBus.removeListener("add", listener);
// The listener is removed before dispatch and won't be triggered.
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
Patterns
Compile time type safety
An event map can be used to strictly type the events:
import { MemoryEventBusAdapter } from "@daiso-tech/core/event-bus/memory-event-bus-adapter";
import type { IEventBus } from "@daiso-tech/core/event-bus/contracts";
import { EventBus } from "@daiso-tech/core/event-bus";
type AddEvent = {
a: number;
b: number;
};
type EventMap = {
add: AddEvent;
};
const eventBus = new EventBus<EventMap>({
adapter: new MemoryEventBusAdapter(),
});
// A typescript error will show up because the event name doesnt exist.
await eventBus.dispatch("addd", {
a: 2,
b: 2,
});
// A typescript error will show up because the event fields doesnt match
await eventBus.dispatch("add", {
nbr1: 1,
nbr2: 2,
});
// A typescript error will show up because the event name doesnt exist.
await eventBus.addListener("addd", (event) => {
console.log(event);
});
Runtime type safety
You can enforce runtime and compiletime type safety by passing standard schema to the cache:
import { MemoryEventBusAdapter } from "@daiso-tech/core/event-bus/memory-event-bus-adapter";
import { EventBus } from "@daiso-tech/core/event-bus";
import { z } from "zod";
const eventMapSchema = {
add: z.object({
a: z.number(),
b: z.number(),
}),
};
// The event type will be infered
const eventBus = new EventBus({
adapter: new MemoryEventBusAdapter(),
eventMapSchema,
});
// A typescript and runtime error will show up because the event fields doesnt match
await eventBus.dispatch("add", {
nbr1: 1,
nbr2: 2,
});
Subscribe method
The subscription pattern provides automatic cleanup through an unsubscribe function:
const unsubscribe = await eventBus.subscribe("add", (event) => {
console.log(event);
});
await eventBus.dispatch("add", {
a: 20,
b: 5,
});
await unsubscribe();
One-Time event handling
For listeners that should only trigger once:
await eventBus.listenOnce("add", (event) => {
console.log(event);
});
// Listener will be only triggered here
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
// Listener will not be triggered because it removed after the first dispatch.
await eventBus.dispatch("add", {
a: 3,
b: 3,
});
You can also cancel one-time listeners before they trigger:
import type { BaseEvent } from "@daiso-tech/core/event-bus/contracts";
const listener = (event: BaseEvent) => {
console.log(event);
};
await eventBus.listenOnce("add", listener);
await eventBus.removeListener("add", listener);
// The listener is removed before dispatch and won't be triggered.
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
The subscribeOnce method creates a one-time listener and returns an unsubscribe function:
const unsubscribe = await eventBus.subscribeOnce("add", (event) => {
console.log(event);
});
await unsubscribe();
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
Promise-based event handling
Wait for events using promises:
import { Task } from "@daiso-tech/core/task";
import { TimeSpan } from "@daiso-tech/core/time-span";
// This code block will run concurrently
(async () => {
// We wait on second and thereafter dispatch the event.
await Task.delay(TimeSpan.fromSeconds(1));
await eventBus.dispatch("add", {
a: 30,
b: 20,
});
})();
// The promise will resolve after one second, when the event is dispatched.
const event = await eventBus.asPromise("add");
console.log(event);
Seperating dispatching and listening
The library includes two additional contracts:
-
IEventDispatcher– Allows only event dispatching. -
IEventListenable– Allows only event listening.
This seperation makes it easy to visually distinguish the two contracts, making it immediately obvious that they serve different purposes.
import type {
IEventBus,
IEventListenable,
IEventDispatcher,
} from "@daiso-tech/core/event-bus/contracts";
import { MemoryEventBusAdapter } from "@daiso-tech/core/event-bus/memory-event-bus-adapter";
import { EventBus } from "@daiso-tech/core/event-bus";
const eventBus: IEventBus = new EventBus({
// You can choose the adapter to use
adapter: new MemoryEventBusAdapter(),
});
type AddEvent = {
a: number;
b: number;
};
type EventMap = {
add: AddEvent;
};
async function listenerFunc(
eventListenable: IEventListenable<EventMap>,
): Promise<void> {
// You cannot access the dispatch method
// You will get typescript error if you try
await eventListenable.addListener("add", (event) => {
console.log("EVENT:", event);
});
}
async function dispatchingFunc(
eventDispatcher: IEventDispatcher<EventMap>,
): Promise<void> {
// You cannot access the listener methods
// You will get typescript error if you try
await eventDispatcher.dispatch("add", {
a: 20,
b: 5,
});
}
await listenerFunc(eventBus);
await dispatchingFunc(eventBus);
Invokable listeners
An event listener are Invokable meaning you can also pass in an object (class instance or object literal) as listener:
For further information refer the Invokable docs.
type AddEvent = {
a: number;
b: number;
};
class Listener implements IEventListenerObject<AddEvent> {
private count = 0;
invoke(event: AddEvent): void {
console.log("EVENT:", event);
console.log("COUNT:", count);
this.count++;
}
}
await eventBus.addListener("add", new Listener());
await eventBus.dispatch("add", {
a: 1,
b: 2,
});
await eventBus.dispatch("add", {
a: 3,
b: -1,
});
Namespacing
You can use the Namespace class to group related without conflicts.
For further information about namespacing refer to @daiso-tech/core/namespace documentation.
import { Namespace } from "@daiso-tech/core/namespace";
import { RedisPubSubEventBusAdapter } from "@daiso-tech/core/event-bus/redis-pub-sub-event-bus-adapter";
import { EventBus } from "@daiso-tech/core/event-bus";
import { Serde } from "@daiso-tech/core/serde";
import { SuperJsonSerdeAdapter } from "@daiso-tech/core/serde/super-json-serde-adapter";
import Redis from "ioredis";
const client = new Redis("YOUR_REDIS_CONNECTION_STRING");
const serde = new Serde(new SuperJsonSerdeAdapter());
const eventBusA = new EventBus({
namespace: new Namespace("@eventBus-a"),
adapter: new RedisPubSubEventBusAdapter({
client,
serde,
}),
});
const eventBusB = new EventBus({
namespace: new Namespace("@eventBus-b"),
adapter: new RedisPubSubEventBusAdapter({
client,
serde,
}),
});
await eventBusA.addListener("test", (event) => {
console.log("TEST_A:", event);
});
await eventBusB.addListener("test", () => {
console.log("TEST_B", event);
});
// Will only log "TEST_A" { testA: true }
await eventBusA.dispatch("test", {
testA: true,
});
// Will only log "TEST_B" { testB: true }
await eventBusB.dispatch("test", {
testB: true,
});
Further information
For further information refer to @daiso-tech/core/event-bus API docs.