← all parts

Part · jobs.queue

What's actually behind jobs.queue

The part exactly as partkit add jobs.queue vendors it into your repo — verified, locked, every byte readable. Nothing here is mocked.

jobs.queuev1.1.0

✓ attested🔒 read-onlyjobs.queue@1jobs.cron@1

Lives at parts/jobs.queue/ in your repo — open, owned, readable. Not buried in node_modules. 572 lines of source you can audit.

13 conformance tests passedverified 2026-06-15↗ CI run
content hash ed9ddd78aa…57587bpinned in parts.lockctrlai guard fails CI if a single byte changes
tested against node 25.3.0graphile-worker 0.16.6

Public API — what your seam calls

  • jobs(db: SqlExecutor): Jobs
  • Jobs { enqueue(input: EnqueueInput): Promise<EnqueuedJob>; listFailed(filter?: FailedFilter): Promise<FailedJob[]> }
  • runWorker(config: WorkerConfig): Promise<RunningWorker>
  • drainOnce(config: WorkerConfig): Promise<void>
  • class JobsError extends Error { code: JobsErrorCode }
  • types: SqlExecutor, EnqueueInput, EnqueuedJob, FailedJob, FailedFilter, WorkerConfig, TaskHandler, TaskHandlers, CronSchedule, CronItem, RunningWorker, Jobs, JobsErrorCode

Invariants — guarantees the contract pins

  1. Importing the part performs no I/O and never throws; configuration is validated at call time and every failure surfaces as a typed JobsError — the raw driver/library error (which may carry the connection string) never appears in the message
  2. Invalid input — a blank task name, an empty connection string, or an empty task map — fails fast with JobsError('invalid_input') and performs no work
  3. enqueue records a job through the SqlExecutor seam (the part-owned add_job function), so it is serverless-safe and can run inside the app's own transaction; it returns the job id and run_at and the job is immediately visible to a worker
  4. A worker runs the app-provided handler for each due job and removes it on success; the same task map drives both the long-running worker (runWorker) and the serverless drain (drainOnce)
  5. A failing job is retried with capped exponential backoff up to maxAttempts, then stops being retried — the dead-letter; listFailed surfaces exactly the jobs that exhausted their attempts, with their last error
  6. Enqueuing twice with the same jobKey yields a single job, not two (idempotent enqueue)
  7. jobs.cron: a cron schedule registers recurring jobs that the worker executes on their pattern (a missed run within the backfill window is enqueued on startup), and an invalid cron pattern is rejected at config time with JobsError('invalid_input')
  8. The part owns the graphile_worker schema via its shipped migration and the worker's boot-time migration is a verified no-op against it; every statement the enqueue/read seam issues is fully parameterized and targets only that schema

Owns in your Postgres

graphile_worker._private_jobsgraphile_worker._private_tasksgraphile_worker._private_job_queuesgraphile_worker._private_known_crontabsgraphile_worker.migrations

Dependencies

graphile-worker ^0.16.0
SOURCEparts/jobs.queue/17 files · click to read
parts/jobs.queue/src/index.tstypescript · 3,143 bytes
/**
 * jobs.queue — public interface. The ONLY legal import surface.
 * Contract: ../contract.json · What your app must provide: ../seams.md
 *
 * Provides jobs.queue@1 (durable background jobs with retry/backoff/dead-letter)
 * and jobs.cron@1 (recurring scheduled jobs). enqueue + the dead-letter read run
 * through the app-provided SqlExecutor seam (serverless-safe, transactional); the
 * worker shapes wrap graphile-worker. Importing this module performs no I/O and
 * never statically loads graphile-worker (the worker loads it on demand).
 */
import { JobsError } from "./internal/errors";
import { ENQUEUE_SQL, LIST_FAILED_SQL, rowToEnqueued, rowToFailedJob } from "./internal/sql";
import type {
  EnqueueInput,
  EnqueuedJob,
  FailedFilter,
  FailedJob,
  Jobs,
  SqlExecutor,
} from "./internal/types";
import { validateEnqueue, validateFailedFilter } from "./internal/validate";

export { JobsError } from "./internal/errors";
export type { JobsErrorCode } from "./internal/errors";
export { drainOnce, runWorker } from "./internal/worker";
export type {
  CronItem,
  CronSchedule,
  EnqueueInput,
  EnqueuedJob,
  FailedFilter,
  FailedJob,
  JobKeyMode,
  Jobs,
  RunningWorker,
  SqlExecutor,
  TaskHandler,
  TaskHandlers,
  WorkerConfig,
} from "./internal/types";

/**
 * Bind the enqueue + dead-letter read operations to a database connection (the
 * SqlExecutor seam). Constructing it performs no I/O and never throws — input is
 * validated, and the database touched, only when a method runs (contract
 * invariant 1, serverless-safe). Pass a per-request executor from your pool; an
 * enqueue runs inside whatever transaction that executor carries.
 */
export function jobs(db: SqlExecutor): Jobs {
  return {
    enqueue: (input: EnqueueInput): Promise<EnqueuedJob> => enqueue(db, input),
    listFailed: (filter?: FailedFilter): Promise<FailedJob[]> => listFailed(db, filter ?? {}),
  };
}

async function enqueue(db: SqlExecutor, input: EnqueueInput): Promise<EnqueuedJob> {
  const v = validateEnqueue(input); // throws JobsError('invalid_input') before any SQL
  let result: { rows: Record<string, unknown>[] };
  try {
    result = await db.query(ENQUEUE_SQL, [
      v.task,
      v.payloadJson,
      v.queueName,
      v.runAt,
      v.maxAttempts,
      v.jobKey,
      v.priority,
      v.jobKeyMode,
    ]);
  } catch (e) {
    throw new JobsError("storage", "failed to enqueue job", { cause: e });
  }
  const row = result.rows[0];
  if (row === undefined) {
    throw new JobsError(
      "storage",
      "enqueue returned no row — is the graphile_worker migration applied (partkit migrate)?",
    );
  }
  return rowToEnqueued(row, v.task);
}

async function listFailed(db: SqlExecutor, filter: FailedFilter): Promise<FailedJob[]> {
  const v = validateFailedFilter(filter); // throws JobsError('invalid_input') before any SQL
  let result: { rows: Record<string, unknown>[] };
  try {
    result = await db.query(LIST_FAILED_SQL, [v.task, v.limit]);
  } catch (e) {
    throw new JobsError("storage", "failed to list failed jobs", { cause: e });
  }
  return result.rows.map(rowToFailedJob);
}