Built-in Message Queue
Description
Message queue is a DBMS subsystem for sending typed messages and their receipt by interested (subscribed) clients. There are two types of queues: persistent and in-memory. In-memory queue stores messages before processing by all subscribed clients exclusively in memory, persistent queue can store data on disk, so that the data will be available after the database restarting.
Currently, only an in-memory queue is implemented. It stores elements in a buffer with fixed size, and if it is full, the qhb_enqueue function will overwrite the oldest items.
Required System Configuration
Since a dedicated DBMS system process is used as a queue broker, it is necessary to take into account the maximum allowed number of these processes, specified by max_worker_processes configuration parameter. You should set the value based on the number of brokers that will handle queues on this QHB cluster.
In case it is impossible to start additional server processes, start_mq_broker() function will return an error.
Launching Message Queue Broker at DBMS Starting
To launch the message queue broker at DBMS startup, you need to add with_mq_broker parameter to qhb.conf file and specify for which databases and under which users to launch it. It is important that both the database and the user exist, and that the user has necessary rights to work with the database, otherwise the DBMS start will be interrupted
# Adding automatic message queue broker startup for databases dbname1 and db_name2
echo with_mq_broker = 'db_owner1:db_name1, db_owner2:db_name2' >> $PGDATA/qhb.conf;
# Adding automatic message queue broker startup for database dbname1 only
echo with_mq_broker = 'db_owner1:db_name1' >> $PGDATA/qhb.conf;
Functions for Working with Message Queues
| Name | Description |
|---|---|
| status_mq_broker() | Request broker status |
| start_mq_broker() | Start the message queue broker while the DBMS is running |
| stop_mq_broker() | Stop the message queue broker |
| qhb_create_queue(name,typeoid) | Create a queue of the specified type |
| qhb_delete_queue(name) | Delete a queue |
| qhb_register(queue,subscriber,callback) | Register a subscriber to process new messages |
| qhb_unregister(queue,subscriber) | Delete a subscriber |
| qhb_enqueue(queue,data) | Add an item to the queue |
| qhb_dequeue() | Retrieve an item from a queue (available from a subscribed handler only) |
Message Queue Broker
The message queue broker is a separate process that provides internal interface for working with a queue. In order to work with it, you need to explicitly start this process using the start_mq_broker() function. Besides the broker, the worker process is also started, which actually calls the handlers and can restart in case of any errors. Both processes are implemented as built-in BackgroundWorker, and therefore their PID can be found in the pg_stat_activity table.
Message Processing Procedure
Messages are delivered to subscribers asynchronously by calling the user-specified procedure. In order to process new messages, you must register the processing procedure using qhb_register function. The procedure can be created using the CREATE PROCEDURE command and must have an empty set of arguments, for example:
CREATE PROCEDURE callback()
LANGUAGE plpgsql AS $$
BEGIN
CREATE TABLE IF NOT EXISTS my_table (x TEXT);
INSERT INTO my_table
SELECT x FROM qhb_dequeue() as (x TEXT);
END; $$;
The item is unconditionally removed from the queue before the handler is called, even if the handler does not call qhb_dequeue. Each qhb_dequeue call within one handler call will return the same result regardless of how many times it has been called.
Any errors that occur in the procedure are logged and are not processed further (the processing procedure is not removed from the subscriber list as well). Errors raising in one of the handlers does not affect the message receipt by other handlers.
IMPORTANT!
There are no restrictions on the use of queue functions in the message handling procedure. That is, in the message handler, in addition to receiving a message from the queue using qhb_dequeue, you can also add a message to the queue, create and/or delete a queue, register or delete a subscriber, etc. You should be very careful when using functions other than qhb_dequeue for work with the queue in the handler — adding a message to the queue in the message handler can result in uncontrolled system behavior, uncontrolled side effects, and loss of messages in the queue due to their rewriting. When implementing a message handling procedure, it is best practice to avoid using any queue handling functions other than qhb_dequeue.
Typing
A queue is strongly typed and can contain items of only one type; this type is explicitly specified when the queue is created. For example, a queue with the name my_queue1 and the type int4 can be created as follows:
SELECT qhb_create_queue(name := 'my_queue1', typeoid := oid)
FROM pg_type
WHERE typname = 'int4';
All data types that implement the RECEIVE and SEND functions are supported.
Using enqueue or dequeue with an invalid type will result in an corresponding error.
Transactionality Support
The queue supports DBMS transactionality: sending a message to the queue via
qhb_enqueue inside a transaction will execute the sending only after
the current transaction has successfully completed. When executing the ROLLBACK
command or an unhandled exception inside a transaction, the data will not be
delivered to the queue and will be further ignored (the handler will also remain
registered).
Internal State of the Message Queue
The queue state is stored for each database separately in the system tables qhb_queue and qhb_queue_consumer. The first one contains a list of queues and their types, the second one contains a list of subscribed handlers and their matching with the queue. It is recommended to use these tables for informational purposes only, since changing them directly leads to inconsistency.