888 888b d888 .d88888b. 888 8888b d8888 d88P" "Y88b 888 88888b.d88888 888 888 888 .d88b. 88888b. 88888b. 888 888 888Y88888P888 888 888 888 d88""88b 888 "88b 888 "88b 888 888 888 Y888P 888 888 888 888 888 888 888 888 888 888 888 888 888 Y8P 888 888 Y8b 888 888 Y88..88P 888 888 888 888 Y88b 888 888 " 888 Y88b.Y8b88P 88888888 "Y88P" 888 888 888 888 "Y88888 888 888 "Y888888" 888 Y8b Y8b d88P "Y88P"
Building LonnyMQ
PostgreSQL can work surprisingly well as a message queue. There are already plenty of articles showing how to build a basic queue with SELECT FOR UPDATE SKIP LOCKED.
I want to talk through my own experience building a PostgreSQL-backed message queue that goes beyond a simple FIFO implementation and explores features such as:
- Scheduled messages
- Message prioritisation
- Multi-tenant fairness and concurrency constraints
- Durability
The goal is to keep the solution small without compromising dequeue performance: every dequeue should be served by an index lookup, not a linear scan.
Why PostgreSQL?
Before we dive in, it is worth answering the obvious question: "Why use PostgreSQL as a message queue instead of something like Redis or RabbitMQ?" PostgreSQL has a few useful properties:
- Assuming you are already using PostgreSQL (not unlikely), it keeps your infrastructure simple and cheap.
- Queue actions can piggy-back on existing database transactions, allowing them to happen in lock-step with your business logic and eliminating a large class of race conditions that appear when queue state lives somewhere else.
- Raw message throughput can easily exceed
1_000messages per second, which is plenty fast for many workloads. - If queue state lives in the database, it can be inspected and monitored using SQL queries and familiar database tooling.
Our implementation
We will implement our message queue with plpgsql functions that drive queue actions such as message create/dequeue. There are a few benefits to keeping the core logic in the database:
- Language-specific implementations become lightweight bindings. The business logic lives in installed
plpgsqlfunctions, and each client only needs to pass data in and out. - Multi-step queue actions do not need to shuttle intermediate state back and forth over the network.
- Each function call runs inside a single transaction context, so multi-step actions can remain atomic without every client binding needing explicit transaction support.
A simple FIFO queue (quick refresher)
We will begin by building a simple, performant FIFO queue that uses FOR UPDATE SKIP LOCKED.
We can begin by creating a simple table to store our messages:
CREATE TABLE message (
id BIGSERIAL PRIMARY KEY,
content BYTEA NOT NULL
);Enqueueing a message is just an insert. Using a plpgsql function feels like overkill here, but it gives us a consistent shape to build on:
CREATE FUNCTION message_create (
p_content BYTEA
) RETURNS VOID AS $$
BEGIN
INSERT INTO "message" (content)
VALUES (p_content);
END;
$$ LANGUAGE plpgsqlA dequeue captures a message using FOR UPDATE so concurrent dequeues cannot return the same row. SKIP LOCKED prevents workers from being head-of-line blocked by a row another worker has already locked. Ordering by id gives us FIFO behaviour for messages created within a transaction because IDs assigned within that transaction are monotonic. It is also efficient because the primary key gives us a B-tree index that can find the lowest id quickly.
CREATE FUNCTION message_dequeue ()
RETURNS BYTEA
AS $$
DECLARE
v_message RECORD;
BEGIN
-- Capture a message
SELECT "id", "content" FROM "message"
ORDER BY "id" ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
INTO v_message;
-- If there is nothing to capture return a NULL
IF v_message."id" IS NULL THEN
RETURN NULL::BYTEA;
END IF;
-- If we've captured a message, delete it and return its content
DELETE FROM "message"
WHERE "id" = v_message."id";
RETURN v_message."content";
END;
$$ LANGUAGE plpgsqlUsing BIGSERIAL is better than using a timestamp field for FIFO ordering. If we used a created_at timestamp set to CURRENT_TIMESTAMP(), all messages created within the same transaction would have the same timestamp, so their order would be undefined. IDs assigned by BIGSERIAL are monotonic within the transaction, giving those messages a stable relative order. CLOCK_TIMESTAMP() helps a little, but still leaves us vulnerable to the system clock changing unexpectedly, for example if an NTP update moves the clock backwards during a transaction.