@ooneex/queue component is a background job layer built on BullMQ. You extend the abstract Queue base class, declare the data your jobs carry, and implement a handler that processes each job. The base class wires the producer (add, addBulk, removeJob) and the consumer (a BullMQ Worker) together over a Redis connection, and forwards worker lifecycle events to optional hooks on your class.
Why this component
- One base class, producer and consumer. Extend
Queue<T, R>and you get typedadd,addBulk,removeJob, andclosemethods plus a worker that runs yourhandler. - Type-safe jobs. The job data type
T(a record of scalars) and the return typeRflow throughaddandhandlerso producers and consumers stay in sync. - Lifecycle hooks. Implement optional
onCompleted,onFailed,onError, and other hooks; the base class binds them to the worker automatically. - Redis-backed. Jobs persist in Redis through BullMQ, so they survive restarts and scale across worker instances.
- Container-managed. Register a queue class with a decorator and resolve it from the container.
How it works
A queue class extendsQueue<T, R>, where T extends Record<string, ScalarType> is the job data and R is the value the handler returns. Inside the constructor you create a BullMQ Queue (the producer) and a Worker (the consumer) over a shared Redis connection, then call registerEvents() to bind your hooks.
The producer side enqueues work:
| Method | Purpose |
|---|---|
add(name, data, opts?) | Enqueue one job; returns the created Job<T, R>. |
addBulk(jobs) | Enqueue many jobs at once; returns the created jobs. |
removeJob(id) | Remove a job by id; returns the number removed. |
close() | Close the worker and the queue connections. |
handler:
QueueHandlerReturnType<R> is Promise<R> | R, so the handler may be sync or async. The worker invokes it for each job and the returned value becomes the job result (surfaced to onCompleted).
Worker lifecycle events are optional hooks on IQueue. Declare any of them and registerEvents() attaches them:
| Hook | Fires when |
|---|---|
onActive(job, prev) | A job moved to active and started processing. |
onCompleted(job, result, prev) | A job finished successfully; result is the handler’s return value. |
onFailed(job, error, prev) | A job threw during processing (job may be undefined). |
onProgress(job, progress) | A job reported progress via job.updateProgress. |
onStalled(jobId, prev) | A job was detected as stalled and is about to be retried. |
onDrained() | The queue had no more jobs to process. |
onPaused() / onResumed() | The worker was paused or resumed. |
onClosing(message) / onClosed() | The worker is closing or has closed. |
onReady() | The worker connected to Redis and is ready. |
onError(error) | The worker hit an error unrelated to a specific job. Always attach this to avoid unhandled exceptions. |
Environment variables
The Redis connection string comes from the injectedAppEnv. Set it so queue classes can connect.
| Variable | Required | Purpose |
|---|---|---|
QUEUE_REDIS_URL | Yes | Redis connection string, e.g. redis://localhost:6379. Missing throws QueueException (QUEUE_REDIS_URL_REQUIRED). |
Decorator and usage
@decorator.queue()
Registers a queue class with the container. It accepts an optional scope (defaults to singleton). Apply it to a class that extends Queue.
handler for each:
Exceptions
The component throwsQueueException when a queue is misconfigured. It carries a machine-readable key, a human-readable message, and a data object, and reports an Internal Server Error status.
| Key | When |
|---|---|
QUEUE_REDIS_URL_REQUIRED | A queue class is constructed without QUEUE_REDIS_URL set on AppEnv. |
Best practices
- Keep job data small and serializable. Job data is a record of scalars (
string,number,bigint,boolean); pass ids and look up the rest in the handler rather than embedding large payloads. - Make handlers idempotent. Jobs can be retried after a failure or a stall, so processing the same job twice must be safe.
- Always attach
onError. Without it, worker-level errors go unhandled — implementonError(and usuallyonFailed) on every queue. - Retry transient failures. Configure
attemptswith exponential backoff so flaky downstream calls recover instead of dropping the job. - Bound Redis growth. Use
removeOnCompleteandremoveOnFailso finished jobs don’t accumulate on a shared instance. - Tune concurrency and rate limits. Match
concurrencyandlimiterto what the handler’s downstream resources can sustain. - Close on shutdown. Call
close()to drain the worker and release Redis connections cleanly.
CLI command
Scaffold a queue class and its test file with the generator. It writes the class undermodules/<module>/src/queues/<Name>Queue.ts and installs @ooneex/queue if it is missing.
| Option | Description | Default |
|---|---|---|
--name | Queue class name. The Queue suffix is appended automatically. | Prompted if omitted |
--module | Target module the class is generated into. | shared |
--override | Overwrite an existing class without prompting. | false |
Queue and Worker from QUEUE_REDIS_URL, registers its event hooks, and leaves a handler for you to implement.
Use with Claude and Codex
The generator ships a matchingqueue:create skill. It runs the scaffold and then guides your AI agent through completing the queue — implementing handler, adding typed add/addBulk wrappers for the jobs it carries, and ensuring QUEUE_REDIS_URL is set. Initialize the skills once for your agent:
- Claude
- Codex
Prompt
queue:create --name=Email, then implements the handler to send each queued email.