888 888b d888 .d88888b. 888 8888b d8888 d88P" "Y88b 888 88888b.d88888 888 888 888 .d88b. 88888b. 88888b. 888 888 888Y88888P888 888 888 888 d88""88b 888 "88b 888 "88b 888 888 888 Y888P 888 888 888 888 888 888 888 888 888 888 888 888 888 Y8P 888 888 Y8b 888 888 Y88..88P 888 888 888 888 Y88b 888 888 " 888 Y88b.Y8b88P 88888888 "Y88P" 888 888 888 888 "Y88888 888 888 "Y888888" 888 Y8b Y8b d88P "Y88P"
Scheduling
We now have a performant, albeit bare-bones, PostgreSQL message queue. This is where many articles stop, but we can go further. The first improvement is simple scheduling: the ability to specify a timestamp before which a message should not be dequeued.
To do this, we make a small addition to our message table: a dequeue_at field:
CREATE TABLE message (
id BIGSERIAL PRIMARY KEY,
content BYTEA NOT NULL,
dequeue_at TIMESTAMPTZ NOT NULL
);Our message creation function now accepts a scheduled dequeue time. If one is not provided, we default to NOW():
CREATE FUNCTION message_create (
p_content BYTEA,
p_dequeue_at TIMESTAMPTZ
) RETURNS VOID AS $$
DECLARE
v_dequeue_at TIMESTAMPTZ;
BEGIN
v_dequeue_at := COALESCE(p_dequeue_at, NOW());
INSERT INTO "message" (content, dequeue_at)
VALUES (p_content, v_dequeue_at);
END;
$$ LANGUAGE plpgsqlDequeue is now changed to only return messages that have elapsed their scheduled waiting time. We will now also perform a lexicographical sort by dequeue_at and then id to ensure the oldest messages are processed first.
CREATE FUNCTION message_dequeue ()
RETURNS BYTEA
AS $$
DECLARE
v_message RECORD;
BEGIN
-- Capture a message
SELECT "id", "content" FROM "message"
WHERE "dequeue_at" <= NOW()
ORDER BY "dequeue_at" ASC, "id" ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
INTO v_message;
-- If there is nothing to capture return a NULL
IF v_message."id" IS NULL THEN
RETURN NULL::BYTEA;
END IF;
-- If we've captured a message, delete it and return its content
DELETE FROM "message"
WHERE "id" = v_message."id";
RETURN v_message."content";
END;
$$ LANGUAGE plpgsqlKeen-eyed observers may already be groaning. In the previous section, I warned against using timestamps for FIFO ordering, and now it looks like I am doing exactly that. However, within a transaction, NOW() is stable. All messages enqueued in the same transaction will have the same dequeue_at, so their relative order is still determined by id. FIFO ordering within a single transaction is preserved.
One thing is still missing: an explicit index. Without it, dequeue would need to scan through the table to find the lowest available dequeue_at. We avoid that with:
CREATE INDEX message_dequeue_ix
ON "message" ("dequeue_at" ASC, "id" ASC);