May 4, 2026

A jobs queue in Postgres, in 60 lines

#postgres#drizzle#typescript#engineering

Most teams pull in Redis or a job runner the moment they need to defer work. They don't need to. Postgres has had everything you need since version 9.5: SELECT ... FOR UPDATE SKIP LOCKED.

Here's the entire queue, top to bottom.

Schema

CREATE TYPE job_status AS ENUM ('queued', 'running', 'done', 'failed');
 
CREATE TABLE jobs (
  id           uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  kind         text NOT NULL,
  payload      jsonb NOT NULL,
  status       job_status NOT NULL DEFAULT 'queued',
  attempts     int NOT NULL DEFAULT 0,
  max_attempts int NOT NULL DEFAULT 3,
  run_at       timestamptz NOT NULL DEFAULT now(),
  error        text,
  created_at   timestamptz NOT NULL DEFAULT now(),
  updated_at   timestamptz NOT NULL DEFAULT now()
);
 
CREATE INDEX jobs_dequeue_idx ON jobs (status, run_at)
  WHERE status = 'queued';

The partial index is the part most people miss. The planner only walks queued rows, not the full history. Your dequeue stays fast even when the table is millions of rows deep.

Same thing in Drizzle

import { sql } from 'drizzle-orm'
import {
  index,
  integer,
  jsonb,
  pgEnum,
  pgTable,
  text,
  timestamp,
  uuid,
} from 'drizzle-orm/pg-core'
 
export const jobStatus = pgEnum('job_status', ['queued', 'running', 'done', 'failed'])
 
export const jobs = pgTable(
  'jobs',
  {
    id: uuid('id').primaryKey().default(sql`gen_random_uuid()`),
    kind: text('kind').notNull(),
    payload: jsonb('payload').notNull(),
    status: jobStatus('status').notNull().default('queued'),
    attempts: integer('attempts').notNull().default(0),
    maxAttempts: integer('max_attempts').notNull().default(3),
    runAt: timestamp('run_at', { withTimezone: true }).notNull().defaultNow(),
    error: text('error'),
    createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
    updatedAt: timestamp('updated_at', { withTimezone: true }).notNull().defaultNow(),
  },
  (t) => ({
    dequeueIdx: index('jobs_dequeue_idx').on(t.status, t.runAt),
  }),
)

Enqueue

import { db } from '@/lib/db'
import { jobs } from '@/lib/schema'
 
export async function enqueue<T>(kind: string, payload: T, runAt?: Date) {
  const [row] = await db
    .insert(jobs)
    .values({ kind, payload, runAt })
    .returning({ id: jobs.id })
  return row?.id
}

Dequeue

This is the whole point. SKIP LOCKED lets multiple workers grab disjoint rows without ever blocking on each other.

import { sql } from 'drizzle-orm'
import { db } from '@/lib/db'
 
export type Job = {
  id: string
  kind: string
  payload: unknown
  attempts: number
  maxAttempts: number
}
 
export async function dequeue(): Promise<Job | null> {
  const rows = await db.execute<Job>(sql`
    UPDATE jobs
    SET status = 'running',
        attempts = attempts + 1,
        updated_at = now()
    WHERE id = (
      SELECT id FROM jobs
      WHERE status = 'queued' AND run_at <= now()
      ORDER BY run_at
      FOR UPDATE SKIP LOCKED
      LIMIT 1
    )
    RETURNING id, kind, payload, attempts, max_attempts AS "maxAttempts"
  `)
  return rows[0] ?? null
}

Worker

import { eq } from 'drizzle-orm'
import { db } from '@/lib/db'
import { jobs } from '@/lib/schema'
import { dequeue, type Job } from './dequeue'
 
type Handler = (payload: unknown) => Promise<void>
 
export function startWorker(handlers: Record<string, Handler>, pollMs = 1000) {
  let stopped = false
 
  async function tick() {
    while (!stopped) {
      const job = await dequeue()
      if (!job) {
        await sleep(pollMs)
        continue
      }
      await run(job, handlers)
    }
  }
 
  tick()
  return () => {
    stopped = true
  }
}
 
async function run(job: Job, handlers: Record<string, Handler>) {
  const handler = handlers[job.kind]
  if (!handler) {
    await fail(job, `no handler for kind: ${job.kind}`)
    return
  }
  try {
    await handler(job.payload)
    await db
      .update(jobs)
      .set({ status: 'done', updatedAt: new Date() })
      .where(eq(jobs.id, job.id))
  } catch (err) {
    await fail(job, err instanceof Error ? err.message : String(err))
  }
}
 
async function fail(job: Job, error: string) {
  const exhausted = job.attempts >= job.maxAttempts
  await db
    .update(jobs)
    .set({
      status: exhausted ? 'failed' : 'queued',
      error,
      runAt: exhausted ? undefined : new Date(Date.now() + backoffMs(job.attempts)),
      updatedAt: new Date(),
    })
    .where(eq(jobs.id, job.id))
}
 
function backoffMs(attempts: number) {
  return Math.min(60_000, 2 ** attempts * 1000)
}
 
function sleep(ms: number) {
  return new Promise((r) => setTimeout(r, ms))
}

Use it

const stop = startWorker({
  'send-email': async (payload) => {
    const { to, subject, body } = payload as {
      to: string
      subject: string
      body: string
    }
    await resend.emails.send({ from, to, subject, text: body })
  },
  'reindex-product': async (payload) => {
    const { productId } = payload as { productId: string }
    const product = await loadProduct(productId)
    await meilisearch.index('products').addDocuments([product])
  },
})
 
await enqueue('send-email', {
  to: 'a@b.com',
  subject: 'hi',
  body: 'thanks for signing up',
})

What you get

Transactional enqueue. Your job lives or dies with the transaction that created it. Crash safe. A worker dying mid job leaves the row in running, and a separate sweep query can reset rows that have been running for too long. Exponential backoff with a 60 second ceiling. Multiple workers safe by default. Observable in plain SQL: SELECT status, count(*) FROM jobs GROUP BY status is the dashboard.

What you don't get: thousands of jobs per second. If you need that, fine. Reach for the dedicated tool. But almost no team I've worked with actually needs that. They need maybe a few hundred a minute. They spend three sprints wiring up Redis and a job runner to get there.

Sixty lines of TypeScript and one table. The boring stack wins again.