Skip to main content
The @ooneex/queue component is a background job layer built on BullMQ. You extend the abstract Queue base class, declare the data your jobs carry, and implement a handler that processes each job. The base class wires the producer (add, addBulk, removeJob) and the consumer (a BullMQ Worker) together over a Redis connection, and forwards worker lifecycle events to optional hooks on your class.

Why this component

  • One base class, producer and consumer. Extend Queue<T, R> and you get typed add, addBulk, removeJob, and close methods plus a worker that runs your handler.
  • Type-safe jobs. The job data type T (a record of scalars) and the return type R flow through add and handler so producers and consumers stay in sync.
  • Lifecycle hooks. Implement optional onCompleted, onFailed, onError, and other hooks; the base class binds them to the worker automatically.
  • Redis-backed. Jobs persist in Redis through BullMQ, so they survive restarts and scale across worker instances.
  • Container-managed. Register a queue class with a decorator and resolve it from the container.

How it works

A queue class extends Queue<T, R>, where T extends Record<string, ScalarType> is the job data and R is the value the handler returns. Inside the constructor you create a BullMQ Queue (the producer) and a Worker (the consumer) over a shared Redis connection, then call registerEvents() to bind your hooks. The producer side enqueues work:
MethodPurpose
add(name, data, opts?)Enqueue one job; returns the created Job<T, R>.
addBulk(jobs)Enqueue many jobs at once; returns the created jobs.
removeJob(id)Remove a job by id; returns the number removed.
close()Close the worker and the queue connections.
The consumer side processes work. You implement the abstract handler:
public abstract handler(job: Job<T, R>): QueueHandlerReturnType<R>;
QueueHandlerReturnType<R> is Promise<R> | R, so the handler may be sync or async. The worker invokes it for each job and the returned value becomes the job result (surfaced to onCompleted). Worker lifecycle events are optional hooks on IQueue. Declare any of them and registerEvents() attaches them:
HookFires when
onActive(job, prev)A job moved to active and started processing.
onCompleted(job, result, prev)A job finished successfully; result is the handler’s return value.
onFailed(job, error, prev)A job threw during processing (job may be undefined).
onProgress(job, progress)A job reported progress via job.updateProgress.
onStalled(jobId, prev)A job was detected as stalled and is about to be retried.
onDrained()The queue had no more jobs to process.
onPaused() / onResumed()The worker was paused or resumed.
onClosing(message) / onClosed()The worker is closing or has closed.
onReady()The worker connected to Redis and is ready.
onError(error)The worker hit an error unrelated to a specific job. Always attach this to avoid unhandled exceptions.

Environment variables

The Redis connection string comes from the injected AppEnv. Set it so queue classes can connect.
VariableRequiredPurpose
QUEUE_REDIS_URLYesRedis connection string, e.g. redis://localhost:6379. Missing throws QueueException (QUEUE_REDIS_URL_REQUIRED).
QUEUE_REDIS_URL=redis://localhost:6379

Decorator and usage

@decorator.queue()

Registers a queue class with the container. It accepts an optional scope (defaults to singleton). Apply it to a class that extends Queue.
import { decorator, Queue, type ScalarType } from "@ooneex/queue";
import type { Job } from "bullmq";

export type EmailQueueDataType = { to: string; subject: string };
export type EmailQueueReturnType = unknown;

@decorator.queue()
export class EmailQueue extends Queue<EmailQueueDataType, EmailQueueReturnType> {
  // queue and worker are created in the constructor (see the generated stub).

  public async handler(job: Job<EmailQueueDataType, EmailQueueReturnType>): Promise<EmailQueueReturnType> {
    await sendEmail(job.data.to, job.data.subject);
    return job.data;
  }
}
Resolve it from the container, then enqueue jobs (producer) — the worker (consumer) runs handler for each:
import { container } from "@ooneex/container";
import { EmailQueue } from "@/queues/EmailQueue";

const emails = container.get(EmailQueue);

// Enqueue one job
await emails.add("welcome", { to: "john@example.com", subject: "Welcome" });

// Enqueue many at once
await emails.addBulk([
  { name: "welcome", data: { to: "a@example.com", subject: "Welcome" } },
  { name: "welcome", data: { to: "b@example.com", subject: "Welcome" } },
]);
Inject the queue where you produce jobs:
import { inject } from "@ooneex/container";

export class SignupService {
  constructor(@inject(EmailQueue) private readonly emails: EmailQueue) {}

  public async signup(email: string): Promise<void> {
    await this.emails.add("welcome", { to: email, subject: "Welcome" });
  }
}

Exceptions

The component throws QueueException when a queue is misconfigured. It carries a machine-readable key, a human-readable message, and a data object, and reports an Internal Server Error status.
KeyWhen
QUEUE_REDIS_URL_REQUIREDA queue class is constructed without QUEUE_REDIS_URL set on AppEnv.
import { QueueException } from "@ooneex/queue";

try {
  const emails = container.get(EmailQueue);
  await emails.add("welcome", { to: email, subject: "Welcome" });
} catch (error) {
  if (error instanceof QueueException) {
    logger.error(`Queue error [${error.key}]: ${error.message}`, error.data);
  } else {
    throw error;
  }
}

Best practices

  • Keep job data small and serializable. Job data is a record of scalars (string, number, bigint, boolean); pass ids and look up the rest in the handler rather than embedding large payloads.
  • Make handlers idempotent. Jobs can be retried after a failure or a stall, so processing the same job twice must be safe.
  • Always attach onError. Without it, worker-level errors go unhandled — implement onError (and usually onFailed) on every queue.
  • Retry transient failures. Configure attempts with exponential backoff so flaky downstream calls recover instead of dropping the job.
  • Bound Redis growth. Use removeOnComplete and removeOnFail so finished jobs don’t accumulate on a shared instance.
  • Tune concurrency and rate limits. Match concurrency and limiter to what the handler’s downstream resources can sustain.
  • Close on shutdown. Call close() to drain the worker and release Redis connections cleanly.

CLI command

Scaffold a queue class and its test file with the generator. It writes the class under modules/<module>/src/queues/<Name>Queue.ts and installs @ooneex/queue if it is missing.
# Interactive: prompts for the name
ooneex queue:create

# Provide the name
ooneex queue:create --name=Email

# Target a module and overwrite
ooneex queue:create --name=Email --module=notifications --override
OptionDescriptionDefault
--nameQueue class name. The Queue suffix is appended automatically.Prompted if omitted
--moduleTarget module the class is generated into.shared
--overrideOverwrite an existing class without prompting.false
The generated class is a ready-to-run stub: it builds the BullMQ Queue and Worker from QUEUE_REDIS_URL, registers its event hooks, and leaves a handler for you to implement.
import { AppEnv } from "@ooneex/app-env";
import { inject } from "@ooneex/container";
import { decorator, Queue, QueueException, type ScalarType } from "@ooneex/queue";
import { RedisClient } from "bun";
import { type BunRedisRawClient, createBunRedisClient, type Job, Queue as BullQueue, Worker } from "bullmq";

export type EmailQueueDataType = Record<string, ScalarType>;
export type EmailQueueReturnType = unknown;

@decorator.queue()
export class EmailQueue extends Queue<EmailQueueDataType, EmailQueueReturnType> {
  private readonly name = "Email";
  protected queue: BullQueue<EmailQueueDataType, EmailQueueReturnType>;
  protected worker: Worker<EmailQueueDataType, EmailQueueReturnType>;

  constructor(@inject(AppEnv) private readonly env: AppEnv) {
    super();

    if (!this.env.QUEUE_REDIS_URL) {
      throw new QueueException(
        "Queue Redis URL is required. Please set the QUEUE_REDIS_URL environment variable.",
        "QUEUE_REDIS_URL_REQUIRED",
      );
    }

    const rawClient = new RedisClient(this.env.QUEUE_REDIS_URL);
    const connection = createBunRedisClient(rawClient as BunRedisRawClient);

    this.queue = new BullQueue<EmailQueueDataType, EmailQueueReturnType>(this.name, {
      connection,
      defaultJobOptions: {
        attempts: 3,
        backoff: { type: "exponential", delay: 1000 },
        removeOnComplete: { age: 3600, count: 1000 },
        removeOnFail: { age: 24 * 3600, count: 5000 },
      },
    });

    this.worker = new Worker<EmailQueueDataType, EmailQueueReturnType>(
      this.name,
      (job) => this.handler(job),
      {
        connection,
        concurrency: 10,
        limiter: { max: 100, duration: 1000 },
        stalledInterval: 30_000,
        maxStalledCount: 3,
        lockDuration: 30_000,
      },
    );

    this.registerEvents();
  }

  public async handler(job: Job<EmailQueueDataType, EmailQueueReturnType>): Promise<EmailQueueReturnType> {
    // TODO: Implement the job processing logic for the "Email" queue.
    return job.data;
  }
}
See queue:create for the full command reference.

Use with Claude and Codex

The generator ships a matching queue:create skill. It runs the scaffold and then guides your AI agent through completing the queue — implementing handler, adding typed add/addBulk wrappers for the jobs it carries, and ensuring QUEUE_REDIS_URL is set. Initialize the skills once for your agent:
ooneex claude:init
Then ask Claude in natural language — it maps the request to the generator, runs it, and fills in the implementation:
Prompt
Create a queue that processes outbound emails.
For example, the prompt above maps to queue:create --name=Email, then implements the handler to send each queued email.