EventBus usage
The @daiso-tech/core/event-bus component provides a way for dispatching and listening to events independent of underlying technology.
Initial configuration
To begin using the EventBus class, you'll need to create and configure an instance:
import { MemoryEventBusAdapter } from "@daiso-tech/core/event-bus/memory-event-bus-adapter";
import type { IEventBus } from "@daiso-tech/core/event-bus/contracts";
import { EventBus } from "@daiso-tech/core/event-bus";
const eventBus: IEventBus = new EventBus({
// You can choose the adapter to use
adapter: new MemoryEventBusAdapter(),
});
Here is a complete list of settings for the EventBus class.
Event handling basics
Registering Listeners and Dispatching Events
Event listeners can be added to respond to specific events:
await eventBus.addListener("add", (event) => {
console.log(event);
});
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
Note EventBus class instance uses Task instead of a regular Promise. This means you must either await the Task or call its detach method to run it.
Refer to the @daiso-tech/core/task documentation for further information.
Listener management
To properly remove a listener, you must use a named function:
import type { BaseEvent } from "@daiso-tech/core/event-bus/contracts";
const listener = (event: BaseEvent) => {
console.log(event);
};
await eventBus.addListener("add", listener);
await eventBus.removeListener("add", listener);
// The listener is removed before dispatch and won't be triggered.
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
Patterns
Compile time type safety
An event map can be used to strictly type the events:
import { MemoryEventBusAdapter } from "@daiso-tech/core/event-bus/memory-event-bus-adapter";
import type { IEventBus } from "@daiso-tech/core/event-bus/contracts";
import { EventBus } from "@daiso-tech/core/event-bus";
type AddEvent = {
a: number;
b: number;
};
type EventMap = {
add: AddEvent;
};
const eventBus = new EventBus<EventMap>({
adapter: new MemoryEventBusAdapter(),
});
// A typescript error will show up because the event name doesnt exist.
await eventBus.dispatch("addd", {
a: 2,
b: 2,
});
// A typescript error will show up because the event fields doesnt match
await eventBus.dispatch("add", {
nbr1: 1,
nbr2: 2,
});
// A typescript error will show up because the event name doesnt exist.
await eventBus.addListener("addd", (event) => {
console.log(event);
});
Runtime type safety
You can enforce runtime and compiletime type safety by passing standard schema to the cache:
import { MemoryEventBusAdapter } from "@daiso-tech/core/event-bus/memory-event-bus-adapter";
import { EventBus } from "@daiso-tech/core/event-bus";
import { z } from "zod";
const eventMapSchema = {
add: z.object({
a: z.number(),
b: z.number(),
}),
};
// The event type will be infered
const eventBus = new EventBus({
adapter: new MemoryEventBusAdapter(),
eventMapSchema,
});
// A typescript and runtime error will show up because the event fields doesnt match
await eventBus.dispatch("add", {
nbr1: 1,
nbr2: 2,
});
Subscribe method
The subscription pattern provides automatic cleanup through an unsubscribe function:
const unsubscribe = await eventBus.subscribe("add", (event) => {
console.log(event);
});
await eventBus.dispatch("add", {
a: 20,
b: 5,
});
await unsubscribe();
One-Time event handling
For listeners that should only trigger once:
await eventBus.listenOnce("add", (event) => {
console.log(event);
});
// Listener will be only triggered here
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
// Listener will not be triggered because it removed after the first dispatch.
await eventBus.dispatch("add", {
a: 3,
b: 3,
});
You can also cancel one-time listeners before they trigger:
import type { BaseEvent } from "@daiso-tech/core/event-bus/contracts";
const listener = (event: BaseEvent) => {
console.log(event);
};
await eventBus.listenOnce("add", listener);
await eventBus.removeListener("add", listener);
// The listener is removed before dispatch and won't be triggered.
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
The subscribeOnce method creates a one-time listener and returns an unsubscribe function:
const unsubscribe = await eventBus.subscribeOnce("add", (event) => {
console.log(event);
});
await unsubscribe();
await eventBus.dispatch("add", {
a: 5,
b: 5,
});
Task-based event handling
Wait for events using tasks:
import { Task } from "@daiso-tech/core/task";
import { TimeSpan } from "@daiso-tech/core/time-span";
// This code block will run concurrently
(async () => {
// We wait on second and thereafter dispatch the event.
await Task.delay(TimeSpan.fromSeconds(1));
await eventBus.dispatch("add", {
a: 30,
b: 20,
});
})();
// The promise will resolve after one second, when the event is dispatched.
const event = await eventBus.asTask("add");
console.log(event);
Separating dispatching and listening
The library includes two additional contracts:
-
IEventDispatcher– Allows only event dispatching. -
IEventListenable– Allows only event listening.
This seperation makes it easy to visually distinguish the two contracts, making it immediately obvious that they serve different purposes.
import type {
IEventBus,
IEventListenable,
IEventDispatcher,
} from "@daiso-tech/core/event-bus/contracts";
import { MemoryEventBusAdapter } from "@daiso-tech/core/event-bus/memory-event-bus-adapter";
import { EventBus } from "@daiso-tech/core/event-bus";
type AddEvent = {
a: number;
b: number;
};
type EventMap = {
add: AddEvent;
};
async function listenerFunc(
eventListenable: IEventListenable<EventMap>,
): Promise<void> {
// You cannot access the dispatch method
// You will get typescript error if you try
await eventListenable.addListener("add", (event) => {
console.log("EVENT:", event);
});
}
async function dispatchingFunc(
eventDispatcher: IEventDispatcher<EventMap>,
): Promise<void> {
// You cannot access the listener methods
// You will get typescript error if you try
await eventDispatcher.dispatch("add", {
a: 20,
b: 5,
});
}
const eventBus: IEventBus<any> = new EventBus({
// You can choose the adapter to use
adapter: new MemoryEventBusAdapter(),
});
await listenerFunc(eventBus);
await dispatchingFunc(eventBus);
Invokable listeners
An event listener is Invokable meaning you can also pass in an object (class instance or object literal) as listener:
For further information refer the Invokable docs.
type AddEvent = {
a: number;
b: number;
};
class Listener implements IEventListenerObject<AddEvent> {
private count = 0;
invoke(event: AddEvent): void {
console.log("EVENT:", event);
console.log("COUNT:", count);
this.count++;
}
}
await eventBus.addListener("add", new Listener());
await eventBus.dispatch("add", {
a: 1,
b: 2,
});
await eventBus.dispatch("add", {
a: 3,
b: -1,
});
Namespacing
You can use the Namespace class to group related without conflicts. Since namespacing is not used be default, you need to pass an obeject that implements INamespace.
For further information about namespacing refer to @daiso-tech/core/namespace documentation.
import { Namespace } from "@daiso-tech/core/namespace";
import { RedisPubSubEventBusAdapter } from "@daiso-tech/core/event-bus/redis-pub-sub-event-bus-adapter";
import { EventBus } from "@daiso-tech/core/event-bus";
import { Serde } from "@daiso-tech/core/serde";
import { SuperJsonSerdeAdapter } from "@daiso-tech/core/serde/super-json-serde-adapter";
import Redis from "ioredis";
const client = new Redis("YOUR_REDIS_CONNECTION_STRING");
const serde = new Serde(new SuperJsonSerdeAdapter());
const eventBusA = new EventBus({
namespace: new Namespace("@eventBus-a"),
adapter: new RedisPubSubEventBusAdapter({
client,
serde,
}),
});
const eventBusB = new EventBus({
namespace: new Namespace("@eventBus-b"),
adapter: new RedisPubSubEventBusAdapter({
client,
serde,
}),
});
await eventBusA.addListener("test", (event) => {
console.log("TEST_A:", event);
});
await eventBusB.addListener("test", () => {
console.log("TEST_B", event);
});
// Will only log "TEST_A" { testA: true }
await eventBusA.dispatch("test", {
testA: true,
});
// Will only log "TEST_B" { testB: true }
await eventBusB.dispatch("test", {
testB: true,
});
Further information
For further information refer to @daiso-tech/core/event-bus API docs.