> ## Documentation Index
> Fetch the complete documentation index at: https://docs.ooneex.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Queue

> Enqueue and process background jobs with a BullMQ-backed Queue base class wired to Redis.

The `@ooneex/queue` component is a background job layer built on [BullMQ](https://docs.bullmq.io). You extend the abstract `Queue` base class, declare the data your jobs carry, and implement a `handler` that processes each job. The base class wires the producer (`add`, `addBulk`, `removeJob`) and the consumer (a BullMQ `Worker`) together over a Redis connection, and forwards worker lifecycle events to optional hooks on your class.

## Why this component

* **One base class, producer and consumer.** Extend `Queue<T, R>` and you get typed `add`, `addBulk`, `removeJob`, and `close` methods plus a worker that runs your `handler`.
* **Type-safe jobs.** The job data type `T` (a record of scalars) and the return type `R` flow through `add` and `handler` so producers and consumers stay in sync.
* **Lifecycle hooks.** Implement optional `onCompleted`, `onFailed`, `onError`, and other hooks; the base class binds them to the worker automatically.
* **Redis-backed.** Jobs persist in Redis through BullMQ, so they survive restarts and scale across worker instances.
* **Container-managed.** Register a queue class with a decorator and resolve it from the container.

## How it works

A queue class extends `Queue<T, R>`, where `T extends Record<string, ScalarType>` is the job data and `R` is the value the handler returns. Inside the constructor you create a BullMQ `Queue` (the producer) and a `Worker` (the consumer) over a shared Redis connection, then call `registerEvents()` to bind your hooks.

The **producer** side enqueues work:

| Method                   | Purpose                                              |
| ------------------------ | ---------------------------------------------------- |
| `add(name, data, opts?)` | Enqueue one job; returns the created `Job<T, R>`.    |
| `addBulk(jobs)`          | Enqueue many jobs at once; returns the created jobs. |
| `removeJob(id)`          | Remove a job by id; returns the number removed.      |
| `close()`                | Close the worker and the queue connections.          |

The **consumer** side processes work. You implement the abstract `handler`:

```typescript theme={null}
public abstract handler(job: Job<T, R>): QueueHandlerReturnType<R>;
```

`QueueHandlerReturnType<R>` is `Promise<R> | R`, so the handler may be sync or async. The worker invokes it for each job and the returned value becomes the job result (surfaced to `onCompleted`).

Worker lifecycle events are optional hooks on `IQueue`. Declare any of them and `registerEvents()` attaches them:

| Hook                                | Fires when                                                                                             |
| ----------------------------------- | ------------------------------------------------------------------------------------------------------ |
| `onActive(job, prev)`               | A job moved to active and started processing.                                                          |
| `onCompleted(job, result, prev)`    | A job finished successfully; `result` is the handler's return value.                                   |
| `onFailed(job, error, prev)`        | A job threw during processing (`job` may be undefined).                                                |
| `onProgress(job, progress)`         | A job reported progress via `job.updateProgress`.                                                      |
| `onStalled(jobId, prev)`            | A job was detected as stalled and is about to be retried.                                              |
| `onDrained()`                       | The queue had no more jobs to process.                                                                 |
| `onPaused()` / `onResumed()`        | The worker was paused or resumed.                                                                      |
| `onClosing(message)` / `onClosed()` | The worker is closing or has closed.                                                                   |
| `onReady()`                         | The worker connected to Redis and is ready.                                                            |
| `onError(error)`                    | The worker hit an error unrelated to a specific job. Always attach this to avoid unhandled exceptions. |

## Environment variables

The Redis connection string comes from the injected `AppEnv`. Set it so queue classes can connect.

| Variable          | Required | Purpose                                                                                                               |
| ----------------- | -------- | --------------------------------------------------------------------------------------------------------------------- |
| `QUEUE_REDIS_URL` | Yes      | Redis connection string, e.g. `redis://localhost:6379`. Missing throws `QueueException` (`QUEUE_REDIS_URL_REQUIRED`). |

```bash theme={null}
QUEUE_REDIS_URL=redis://localhost:6379
```

## Decorator and usage

### `@decorator.queue()`

Registers a queue class with the container. It accepts an optional scope (defaults to singleton). Apply it to a class that extends `Queue`.

```typescript theme={null}
import { decorator, Queue, type ScalarType } from "@ooneex/queue";
import type { Job } from "bullmq";

export type EmailQueueDataType = { to: string; subject: string };
export type EmailQueueReturnType = unknown;

@decorator.queue()
export class EmailQueue extends Queue<EmailQueueDataType, EmailQueueReturnType> {
  // queue and worker are created in the constructor (see the generated stub).

  public async handler(job: Job<EmailQueueDataType, EmailQueueReturnType>): Promise<EmailQueueReturnType> {
    await sendEmail(job.data.to, job.data.subject);
    return job.data;
  }
}
```

Resolve it from the container, then enqueue jobs (producer) — the worker (consumer) runs `handler` for each:

```typescript theme={null}
import { container } from "@ooneex/container";
import { EmailQueue } from "@/queues/EmailQueue";

const emails = container.get(EmailQueue);

// Enqueue one job
await emails.add("welcome", { to: "john@example.com", subject: "Welcome" });

// Enqueue many at once
await emails.addBulk([
  { name: "welcome", data: { to: "a@example.com", subject: "Welcome" } },
  { name: "welcome", data: { to: "b@example.com", subject: "Welcome" } },
]);
```

Inject the queue where you produce jobs:

```typescript theme={null}
import { inject } from "@ooneex/container";

export class SignupService {
  constructor(@inject(EmailQueue) private readonly emails: EmailQueue) {}

  public async signup(email: string): Promise<void> {
    await this.emails.add("welcome", { to: email, subject: "Welcome" });
  }
}
```

## Exceptions

The component throws `QueueException` when a queue is misconfigured. It carries a machine-readable `key`, a human-readable `message`, and a `data` object, and reports an Internal Server Error status.

| Key                        | When                                                                    |
| -------------------------- | ----------------------------------------------------------------------- |
| `QUEUE_REDIS_URL_REQUIRED` | A queue class is constructed without `QUEUE_REDIS_URL` set on `AppEnv`. |

```typescript theme={null}
import { QueueException } from "@ooneex/queue";

try {
  const emails = container.get(EmailQueue);
  await emails.add("welcome", { to: email, subject: "Welcome" });
} catch (error) {
  if (error instanceof QueueException) {
    logger.error(`Queue error [${error.key}]: ${error.message}`, error.data);
  } else {
    throw error;
  }
}
```

## Best practices

* **Keep job data small and serializable.** Job data is a record of scalars (`string`, `number`, `bigint`, `boolean`); pass ids and look up the rest in the handler rather than embedding large payloads.
* **Make handlers idempotent.** Jobs can be retried after a failure or a stall, so processing the same job twice must be safe.
* **Always attach `onError`.** Without it, worker-level errors go unhandled — implement `onError` (and usually `onFailed`) on every queue.
* **Retry transient failures.** Configure `attempts` with exponential backoff so flaky downstream calls recover instead of dropping the job.
* **Bound Redis growth.** Use `removeOnComplete` and `removeOnFail` so finished jobs don't accumulate on a shared instance.
* **Tune concurrency and rate limits.** Match `concurrency` and `limiter` to what the handler's downstream resources can sustain.
* **Close on shutdown.** Call `close()` to drain the worker and release Redis connections cleanly.

## CLI command

Scaffold a queue class and its test file with the generator. It writes the class under `modules/<module>/src/queues/<Name>Queue.ts` and installs `@ooneex/queue` if it is missing.

```bash theme={null}
# Interactive: prompts for the name
ooneex queue:create

# Provide the name
ooneex queue:create --name=Email

# Target a module and overwrite
ooneex queue:create --name=Email --module=notifications --override
```

| Option       | Description                                                     | Default             |
| ------------ | --------------------------------------------------------------- | ------------------- |
| `--name`     | Queue class name. The `Queue` suffix is appended automatically. | Prompted if omitted |
| `--module`   | Target module the class is generated into.                      | `shared`            |
| `--override` | Overwrite an existing class without prompting.                  | `false`             |

The generated class is a ready-to-run stub: it builds the BullMQ `Queue` and `Worker` from `QUEUE_REDIS_URL`, registers its event hooks, and leaves a `handler` for you to implement.

```typescript theme={null}
import { AppEnv } from "@ooneex/app-env";
import { inject } from "@ooneex/container";
import { decorator, Queue, QueueException, type ScalarType } from "@ooneex/queue";
import { RedisClient } from "bun";
import { type BunRedisRawClient, createBunRedisClient, type Job, Queue as BullQueue, Worker } from "bullmq";

export type EmailQueueDataType = Record<string, ScalarType>;
export type EmailQueueReturnType = unknown;

@decorator.queue()
export class EmailQueue extends Queue<EmailQueueDataType, EmailQueueReturnType> {
  private readonly name = "Email";
  protected queue: BullQueue<EmailQueueDataType, EmailQueueReturnType>;
  protected worker: Worker<EmailQueueDataType, EmailQueueReturnType>;

  constructor(@inject(AppEnv) private readonly env: AppEnv) {
    super();

    if (!this.env.QUEUE_REDIS_URL) {
      throw new QueueException(
        "Queue Redis URL is required. Please set the QUEUE_REDIS_URL environment variable.",
        "QUEUE_REDIS_URL_REQUIRED",
      );
    }

    const rawClient = new RedisClient(this.env.QUEUE_REDIS_URL);
    const connection = createBunRedisClient(rawClient as BunRedisRawClient);

    this.queue = new BullQueue<EmailQueueDataType, EmailQueueReturnType>(this.name, {
      connection,
      defaultJobOptions: {
        attempts: 3,
        backoff: { type: "exponential", delay: 1000 },
        removeOnComplete: { age: 3600, count: 1000 },
        removeOnFail: { age: 24 * 3600, count: 5000 },
      },
    });

    this.worker = new Worker<EmailQueueDataType, EmailQueueReturnType>(
      this.name,
      (job) => this.handler(job),
      {
        connection,
        concurrency: 10,
        limiter: { max: 100, duration: 1000 },
        stalledInterval: 30_000,
        maxStalledCount: 3,
        lockDuration: 30_000,
      },
    );

    this.registerEvents();
  }

  public async handler(job: Job<EmailQueueDataType, EmailQueueReturnType>): Promise<EmailQueueReturnType> {
    // TODO: Implement the job processing logic for the "Email" queue.
    return job.data;
  }
}
```

See [queue:create](/cli/commands/queue-create) for the full command reference.

## Use with Claude and Codex

The generator ships a matching `queue:create` skill. It runs the scaffold and then guides your AI agent through completing the queue — implementing `handler`, adding typed `add`/`addBulk` wrappers for the jobs it carries, and ensuring `QUEUE_REDIS_URL` is set. Initialize the skills once for your agent:

<Tabs>
  <Tab title="Claude">
    ```bash theme={null}
    ooneex claude:init
    ```

    Then ask Claude in natural language — it maps the request to the generator, runs it, and fills in the implementation:

    ```text Prompt icon="terminal" wrap theme={null}
    Create a queue that processes outbound emails.
    ```
  </Tab>

  <Tab title="Codex">
    ```bash theme={null}
    ooneex codex:init
    ```

    Then ask Codex in natural language — it maps the request to the generator, runs it, and fills in the implementation:

    ```text Prompt icon="terminal" wrap theme={null}
    Create a queue that processes outbound emails.
    ```
  </Tab>
</Tabs>

For example, the prompt above maps to `queue:create --name=Email`, then implements the `handler` to send each queued email.
