Background Worker Processes
QHB can be extended to run user-supplied code in separate processes. Such processes are started, stopped and monitored by qhb, which permits them to have a lifetime closely linked to the server's status. These processes are attached to QHB's shared memory area and have the option to connect to databases internally; they can also run multiple transactions serially, just like a regular client-connected server process. Also, by linking to libpq they can connect to the server and behave like a regular client application.
WARNING
There are considerable robustness and security risks in using background worker processes because, being written in the C/RUST language, they have unrestricted access to data. Administrators wishing to enable modules that include background worker processes should exercise extreme caution. Only carefully audited modules should be permitted to run background worker processes.
Background workers can be initialized at the time that QHB is started by including the module name in shared_preload_libraries. A module wishing to run a background worker can register it by calling RegisterBackgroundWorker(BackgroundWorker *worker) from its _PG_init() function. Background workers can also be started after the system is up and running by calling RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle). Unlike RegisterBackgroundWorker, which can only be called from within the qhbmaster process, RegisterDynamicBackgroundWorker must be called from a regular backend or another background worker.
The structure BackgroundWorker is defined thus:
typedef void (*bgworker_main_type)(Datum main_arg);
typedef struct BackgroundWorker
{
char bgw_name[BGW_MAXLEN];
char bgw_type[BGW_MAXLEN];
int bgw_flags;
BgWorkerStartTime bgw_start_time;
int bgw_restart_time; /* in seconds, or BGW_NEVER_RESTART */
char bgw_library_name[BGW_MAXLEN];
char bgw_function_name[BGW_MAXLEN];
Datum bgw_main_arg;
char bgw_extra[BGW_EXTRALEN];
pid_t bgw_notify_pid;
} BackgroundWorker;
bgw_name and bgw_type are strings to be used in log messages, process listings and similar contexts. bgw_type should be the same for all background workers of the same type, so that it is possible to group such workers in a process listing, for example. bgw_name on the other hand can contain additional information about the specific process. (Typically, the string for bgw_name will contain the type somehow, but that is not strictly required.)
bgw_flags is a bitwise-or'd bit mask indicating the capabilities that the module wants. Possible values are:
BGWORKER_SHMEM_ACCESS
Requests shared memory access. This flag is required.
BGWORKER_BACKEND_DATABASE_CONNECTION
Requests the ability to establish a database connection through which it can
later run transactions and queries. A background worker using
BGWORKER_BACKEND_DATABASE_CONNECTION to connect to a database must also
attach shared memory using BGWORKER_SHMEM_ACCESS, or worker start-up will fail.
bgw_start_time is the server state during which qhb should start the process; it can be one of BgWorkerStart_QhbmasterStart (start as soon as qhb itself has finished its own initialization; processes requesting this are not eligible for database connections), BgWorkerStart_ConsistentState (start as soon as a consistent state has been reached in a hot standby, allowing processes to connect to databases and run read-only queries), and BgWorkerStart_RecoveryFinished (start as soon as the system has entered normal read-write state). Note the last two values are equivalent in a server that's not a hot standby. Note that this setting only indicates when the processes are to be started; they do not stop when a different state is reached.
bgw_restart_time is the interval, in seconds, that qhb should wait before restarting the process in the event that it crashes. It can be any positive value, or BGW_NEVER_RESTART, indicating not to restart the process in case of a crash.
bgw_library_name is the name of a library in which the initial entry point for the background worker should be sought. The named library will be dynamically loaded by the worker process and bgw_function_name will be used to identify the function to be called. If calling a function in the core code, this must be set to "qhb".
bgw_function_name is the name of a function in a dynamically loaded library which should be used as the initial entry point for a new background worker. If this function is in a dynamically loaded library, it must be marked PGDLLEXPORT (and not static).
bgw_main_arg is the Datum argument to the background worker main function. This main function should take a single argument of type Datum and return void. bgw_main_arg will be passed as the argument. In addition, the global variable MyBgworkerEntry points to a copy of the BackgroundWorker structure passed at registration time; the worker may find it helpful to examine this structure.
Anywhere where EXEC_BACKEND is defined or in dynamic background workers it is not safe to pass a Datum by reference, only by value. If an argument is required, it is safest to pass an int32 or other small value and use that as an index into an array allocated in shared memory. If a value like a cstring or text is passed then the pointer won't be valid from the new background worker process.
bgw_extra can contain extra data to be passed to the background worker. Unlike bgw_main_arg, this data is not passed as an argument to the worker's main function, but it can be accessed via MyBgworkerEntry, as discussed above.
bgw_notify_pid is the PID of a QHB backend process to which the qhbmaster should send SIGUSR1 when the process is started or exits. It should be 0 for workers registered at qhbmaster startup time, or when the backend registering the worker does not wish to wait for the worker to start up. Otherwise, it should be initialized to MyProcPid.
Once running, the process can connect to a database by calling BackgroundWorkerInitializeConnection(char *dbname, char *username, uint32 flags) or BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags). This allows the process to run transactions and queries using the SPI interface. If dbname is NULL or dboid is InvalidOid, the session is not connected to any particular database, but shared catalogs can be accessed. If username is NULL or useroid is InvalidOid, the process will run as the superuser created during qhb_bootstrap (or initdb). If BGWORKER_BYPASS_ALLOWCONN is specified as flags it is possible to bypass the restriction to connect to databases not allowing user connections. A background worker can only call one of these two functions, and only once. It is not possible to switch databases.
Signals are initially blocked when control reaches the background worker's main function, and must be unblocked by it; this is to allow the process to customize its signal handlers, if necessary. Signals can be unblocked in the new process by calling BackgroundWorkerUnblockSignals and blocked by calling BackgroundWorkerBlockSignals.
If bgw_restart_time for a background worker is configured as BGW_NEVER_RESTART, or if it exits with an exit code of 0 or is terminated by TerminateBackgroundWorker, it will be automatically unregistered by the qhbmaster on exit. Otherwise, it will be restarted after the time period configured via bgw_restart_time, or immediately if the qhbmaster reinitializes the cluster due to a backend failure. Backends which need to suspend execution only temporarily should use an interruptible sleep rather than exiting; this can be achieved by calling WaitLatch(). Make sure the WL_QHBMASTER_DEATH flag is set when calling that function, and verify the return code for a prompt exit in the emergency case that qhb itself has terminated.
When a background worker is registered using the RegisterDynamicBackgroundWorker function, it is possible for the backend performing the registration to obtain information regarding the status of the worker. Backends wishing to do this should pass the address of a BackgroundWorkerHandle * as the second argument to RegisterDynamicBackgroundWorker. If the worker is successfully registered, this pointer will be initialized with an opaque handle that can subsequently be passed to GetBackgroundWorkerPid(BackgroundWorkerHandle *, pid_t *) or TerminateBackgroundWorker(BackgroundWorkerHandle *). GetBackgroundWorkerPid can be used to poll the status of the worker: a return value of BGWH_NOT_YET_STARTED indicates that the worker has not yet been started by the qhbmaster; BGWH_STOPPED indicates that it has been started but is no longer running; and BGWH_STARTED indicates that it is currently running. In this last case, the PID will also be returned via the second argument. TerminateBackgroundWorker causes the qhbmaster to send SIGTERM to the worker if it is running, and to unregister it as soon as it is not.
In some cases, a process which registers a background worker may wish to wait for the worker to start up. This can be accomplished by initializing bgw_notify_pid to MyProcPid and then passing the BackgroundWorkerHandle * obtained at registration time to WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t*) function. This function will block until the qhbmaster has attempted to start the background worker, or until the qhbmaster dies. If the background worker is running, the return value will be BGWH_STARTED, and the PID will be written to the provided address. Otherwise, the return value will be BGWH_STOPPED or BGWH_QHBMASTER_DIED.
A process can also wait for a background worker to shut down, by using the WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) function and passing the BackgroundWorkerHandle * obtained at registration. This function will block until the background worker exits, or qhbmaster dies. When the background worker exits, the return value is BGWH_STOPPED, if qhbmaster dies it will return BGWH_QHBMASTER_DIED.
Background workers can send asynchronous notification messages, either by using
the NOTIFY command via SPI, or directly via Async_Notify(). Such notifications
will be sent at transaction commit. Background workers should not register to
receive asynchronous notifications with the LISTEN command, as there is no
infrastructure for a worker to consume such notifications.
The maximum number of registered background workers is limited by max_worker_processes.
Example of Background Worker Process
This is the sample background worker code that demonstrates various coding patterns: establishing a database connection; starting and committing transactions; using GUC variables, and heeding SIGHUP to reread the configuration file; reporting to pg_stat_activity; using the process latch to sleep and exit in case of qhbmaster death.
This code connects to a database, creates a schema and table, and summarizes the numbers contained therein. To see it working, insert an initial value with "total" type and some initial value; then insert some other rows with "delta" type. Delta rows will be deleted by this worker and their values aggregated into the total.
#include "qhb.h"
/* These are always necessary for a bgworker */
#include "miscadmin.h"
#include "qhbmaster/bgworker.h"
#include "qhbmaster/interrupt.h"
#include "storage/latch.h"
/* these headers are used by this particular worker's code */
#include "access/xact.h"
#include "commands/dbcommands.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(worker_spi_launch);
PGDLLEXPORT pg_noreturn void worker_spi_main(Datum main_arg);
/* GUC variables */
static int worker_spi_naptime = 10;
static int worker_spi_total_workers = 2;
static char *worker_spi_database = NULL;
static char *worker_spi_role = NULL;
/* value cached, fetched from shared memory */
static uint32 worker_spi_wait_event_main = 0;
typedef struct worktable
{
const char *schema;
const char *name;
} worktable;
/*
* Initialize workspace for a worker process: create the schema if it doesn't
* already exist.
*/
static void
initialize_worker_spi(worktable *table)
{
int ret;
int ntup;
bool isnull;
StringInfoData buf;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
initStringInfo(&buf);
appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
table->schema);
debug_query_string = buf.data;
ret = SPI_execute(buf.data, true, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "SPI_execute failed: error code %d", ret);
if (SPI_processed != 1)
elog(FATAL, "not a singleton result");
ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (isnull)
elog(FATAL, "null result");
if (ntup == 0)
{
debug_query_string = NULL;
resetStringInfo(&buf);
appendStringInfo(&buf,
"CREATE SCHEMA \"%s\" "
"CREATE TABLE \"%s\" ("
" type text CHECK (type IN ('total', 'delta')), "
" value integer)"
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
"WHERE type = 'total'",
table->schema, table->name, table->name, table->name);
/* set statement start time */
SetCurrentStatementStartTimestamp();
debug_query_string = buf.data;
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UTILITY)
elog(FATAL, "failed to create my schema");
debug_query_string = NULL; /* rest is not statement-specific */
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
debug_query_string = NULL;
pgstat_report_activity(STATE_IDLE, NULL);
}
void
worker_spi_main(Datum main_arg)
{
int index = DatumGetInt32(main_arg);
worktable *table;
StringInfoData buf;
char name[20];
Oid dboid;
Oid roleoid;
char *p;
bits32 flags = 0;
table = palloc(sizeof(worktable));
sprintf(name, "schema%d", index);
table->schema = pstrdup(name);
table->name = pstrdup("counted");
/* fetch database and role OIDs, these are set for a dynamic worker */
p = MyBgworkerEntry->bgw_extra;
memcpy(&dboid, p, sizeof(Oid));
p += sizeof(Oid);
memcpy(&roleoid, p, sizeof(Oid));
p += sizeof(Oid);
memcpy(&flags, p, sizeof(bits32));
/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Connect to our database */
if (OidIsValid(dboid))
BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
else
BackgroundWorkerInitializeConnection(worker_spi_database,
worker_spi_role, flags);
elog(LOG, "%s initialized with %s.%s",
MyBgworkerEntry->bgw_name, table->schema, table->name);
initialize_worker_spi(table);
/*
* Quote identifiers passed to us. Note that this must be done after
* initialize_worker_spi, because that routine assumes the names are not
* quoted.
*
* Note some memory might be leaked here.
*/
table->schema = quote_identifier(table->schema);
table->name = quote_identifier(table->name);
initStringInfo(&buf);
appendStringInfo(&buf,
"WITH deleted AS (DELETE "
"FROM %s.%s "
"WHERE type = 'delta' RETURNING value), "
"total AS (SELECT coalesce(sum(value), 0) as sum "
"FROM deleted) "
"UPDATE %s.%s "
"SET value = %s.value + total.sum "
"FROM total WHERE type = 'total' "
"RETURNING %s.value",
table->schema, table->name,
table->schema, table->name,
table->name,
table->name);
/*
* Main loop: do this until SIGTERM is received and processed by
* ProcessInterrupts.
*/
for (;;)
{
int ret;
/* First time, allocate or get the custom wait event */
if (worker_spi_wait_event_main == 0)
worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
/*
* Background workers mustn't call usleep() or any direct equivalent:
* instead, they may wait on their process latch, which sleeps as
* necessary, but is awakened if qhbmaster dies. That way the
* background process goes away immediately in an emergency.
*/
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
worker_spi_naptime * 1000L,
worker_spi_wait_event_main);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/*
* In case of a SIGHUP, just reload the configuration.
*/
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Start a transaction on which we can run queries. Note that each
* StartTransactionCommand() call should be preceded by a
* SetCurrentStatementStartTimestamp() call, which sets both the time
* for the statement we're about the run, and also the transaction
* start time. Also, each other query sent to SPI should probably be
* preceded by SetCurrentStatementStartTimestamp(), so that statement
* start time is always up to date.
*
* The SPI_connect() call lets us run queries through the SPI manager,
* and the PushActiveSnapshot() call creates an "active" snapshot
* which is necessary for queries to have MVCC data to work on.
*
* The pgstat_report_activity() call makes our activity visible
* through the pgstat views.
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
debug_query_string = buf.data;
pgstat_report_activity(STATE_RUNNING, buf.data);
/* We can now execute queries via SPI */
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UPDATE_RETURNING)
elog(FATAL, "cannot select from table %s.%s: error code %d",
table->schema, table->name, ret);
if (SPI_processed > 0)
{
bool isnull;
int32 val;
val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (!isnull)
elog(LOG, "%s: count in %s.%s is now %d",
MyBgworkerEntry->bgw_name,
table->schema, table->name, val);
}
/*
* And finish our transaction.
*/
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
debug_query_string = NULL;
pgstat_report_stat(true);
pgstat_report_activity(STATE_IDLE, NULL);
}
/* Not reachable */
}
/*
* Entrypoint of this module.
*
* We register more than one worker process here, to demonstrate how that can
* be done.
*/
void
_PG_init(void)
{
BackgroundWorker worker;
/* get the configuration */
/*
* These GUCs are defined even if this library is not loaded with
* shared_preload_libraries, for worker_spi_launch().
*/
DefineCustomIntVariable("worker_spi.naptime",
"Duration between each check (in seconds).",
NULL,
&worker_spi_naptime,
10,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("worker_spi.database",
"Database to connect to.",
NULL,
&worker_spi_database,
"qhb",
PGC_SIGHUP,
0,
NULL, NULL, NULL);
DefineCustomStringVariable("worker_spi.role",
"Role to connect with.",
NULL,
&worker_spi_role,
NULL,
PGC_SIGHUP,
0,
NULL, NULL, NULL);
if (!process_shared_preload_libraries_in_progress)
return;
DefineCustomIntVariable("worker_spi.total_workers",
"Number of workers.",
NULL,
&worker_spi_total_workers,
2,
1,
100,
PGC_QHBMASTER,
0,
NULL,
NULL,
NULL);
MarkGUCPrefixReserved("worker_spi");
/* set up common data for all our workers */
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "worker_spi");
sprintf(worker.bgw_function_name, "worker_spi_main");
worker.bgw_notify_pid = 0;
/*
* Now fill in worker-specific data, and do the actual registrations.
*
* bgw_extra can optionally include a database OID, a role OID and a set
* of flags. This is left empty here to fallback to the related GUCs at
* startup (0 for the bgworker flags).
*/
for (int i = 1; i <= worker_spi_total_workers; i++)
{
snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
worker.bgw_main_arg = Int32GetDatum(i);
RegisterBackgroundWorker(&worker);
}
}
/*
* Dynamically launch an SPI worker.
*/
Datum
worker_spi_launch(PG_FUNCTION_ARGS)
{
int32 i = PG_GETARG_INT32(0);
Oid dboid = PG_GETARG_OID(1);
Oid roleoid = PG_GETARG_OID(2);
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
char *p;
bits32 flags = 0;
ArrayType *arr = PG_GETARG_ARRAYTYPE_P(3);
Size ndim;
int nelems;
Datum *datum_flags;
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "worker_spi");
sprintf(worker.bgw_function_name, "worker_spi_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
worker.bgw_main_arg = Int32GetDatum(i);
/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
worker.bgw_notify_pid = MyProcPid;
/* extract flags, if any */
ndim = ARR_NDIM(arr);
if (ndim > 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("flags array must be one-dimensional")));
if (array_contains_nulls(arr))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("flags array must not contain nulls")));
Assert(ARR_ELEMTYPE(arr) == TEXTOID);
deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
for (i = 0; i < nelems; i++)
{
char *optname = TextDatumGetCString(datum_flags[i]);
if (strcmp(optname, "ALLOWCONN") == 0)
flags |= BGWORKER_BYPASS_ALLOWCONN;
else if (strcmp(optname, "ROLELOGINCHECK") == 0)
flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("incorrect flag value found in array")));
}
/*
* Register database and role to use for the worker started in bgw_extra.
* If none have been provided, this will fall back to the GUCs at startup.
*/
if (!OidIsValid(dboid))
dboid = get_database_oid(worker_spi_database, false);
/*
* worker_spi_role is NULL by default, so this gives to worker_spi_main()
* an invalid OID in this case.
*/
if (!OidIsValid(roleoid) && worker_spi_role)
roleoid = get_role_oid(worker_spi_role, false);
p = worker.bgw_extra;
memcpy(p, &dboid, sizeof(Oid));
p += sizeof(Oid);
memcpy(p, &roleoid, sizeof(Oid));
p += sizeof(Oid);
memcpy(p, &flags, sizeof(bits32));
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
PG_RETURN_NULL();
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status == BGWH_STOPPED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not start background process"),
errhint("More details may be available in the server log.")));
if (status == BGWH_QHBMASTER_DIED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("cannot start background processes without qhbmaster"),
errhint("Kill all remaining database processes and restart the database.")));
Assert(status == BGWH_STARTED);
PG_RETURN_INT32(pid);
}