Archive Modules

QHB provides infrastructure to create custom modules for continuous archiving (see Section Continuous Archiving and Point-in-Time Recovery (PITR)). While archiving via a shell command (i.e., archive_command) is much simpler, a custom archive module will often be considerably more robust and performant.

When a custom archive_library is configured, QHB will submit completed WAL files to the module, and the server will avoid recycling or removing these WAL files until the module indicates that the files were successfully archived. It is ultimately up to the module to decide what to do with each WAL file, but many recommendations are listed at Section Setting Up WAL Archiving.

Archiving modules must at least consist of an initialization function (see Section Initialization Functions) and the required callbacks (see Section Archive Module Callbacks). However, archive modules are also permitted to do much more (e.g., declare GUCs and register background workers).



Initialization Functions

An archive library is loaded by dynamically loading a shared library with the archive_library's name as the library base name. The normal library search path is used to locate the library. To provide the required archive module callbacks and to indicate that the library is actually an archive module, it needs to provide a function named _PG_archive_module_init. This function is passed a struct that needs to be filled with the callback function pointers for individual actions.

typedef struct ArchiveModuleCallbacks
{
    ArchiveCheckConfiguredCB check_configured_cb;
    ArchiveFileCB archive_file_cb;
    ArchiveShutdownCB shutdown_cb;
} ArchiveModuleCallbacks;
typedef void (*ArchiveModuleInit) (struct ArchiveModuleCallbacks *cb);

Only the archive_file_cb callback is required. The others are optional.



Archive Module Callbacks

The archive callbacks define the actual archiving behavior of the module. The server will call them as required to process each individual WAL file.

Check Callback

The check_configured_cb callback is called to determine whether the module is fully configured and ready to accept WAL files (e.g., its configuration parameters are set to valid values). If no check_configured_cb is defined, the server always assumes the module is configured.

typedef bool (*ArchiveCheckConfiguredCB) (void);

If true is returned, the server will proceed with archiving the file by calling the archive_file_cb callback. If false is returned, archiving will not proceed, and the archiver will emit the following message to the server log:

WARNING:  archive_mode enabled, yet archiving is not configured

In the latter case, the server will periodically call this function, and archiving will proceed only when it returns true.


Archive Callback

The archive_file_cb callback is called to archive a single WAL file.

typedef bool (*ArchiveFileCB) (const char *file, const char *path);

If true is returned, the server proceeds as if the file was successfully archived, which may include recycling or removing the original WAL file. If false is returned, the server will keep the original WAL file and retry archiving later. file will contain just the file name of the WAL file to archive, while path contains the full path of the WAL file (including the file name).


Shutdown Callback

The shutdown_cb callback is called when the archiver process exits (e.g., after an error) or the value of archive_library changes. If no shutdown_cb is defined, no special action is taken in these situations.

typedef void (*ArchiveShutdownCB) (void);


Example

The following example demonstrates a basic archive library implementation that is roughly equivalent to the following shell command:

test ! -f /path/to/dest && cp /path/to/src /path/to/dest

One notable difference between this module and the shell command above is that this module first copies the file to a temporary destination, syncs it to disk, and then durably moves it to the final destination. Another notable difference is that if /path/to/dest already exists but has contents identical to /path/to/src, archiving will succeed, whereas the command shown above would fail. This prevents problems if a file is successfully archived and then the system crashes before a durable record of the success has been made.

#include "qhb.h"

#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>

#include "archive/archive_module.h"
#include "common/int.h"
#include "miscadmin.h"
#include "storage/copydir.h"
#include "storage/fd.h"
#include "utils/guc.h"
#include "utils/memutils.h"

PG_MODULE_MAGIC;

typedef struct BasicArchiveData
{
   MemoryContext context;
} BasicArchiveData;

static char *archive_directory = NULL;

static void basic_archive_startup(ArchiveModuleState *state);
static bool basic_archive_configured(ArchiveModuleState *state);
static bool basic_archive_file(ArchiveModuleState *state, const char *file, const char *path);
static void basic_archive_file_internal(const char *file, const char *path);
static bool check_archive_directory(char **newval, void **extra, GucSource source);
static bool compare_files(const char *file1, const char *file2);
static void basic_archive_shutdown(ArchiveModuleState *state);

static const ArchiveModuleCallbacks basic_archive_callbacks = {
   .startup_cb = basic_archive_startup,
   .check_configured_cb = basic_archive_configured,
   .archive_file_cb = basic_archive_file,
   .shutdown_cb = basic_archive_shutdown
};

/*
 * _PG_init
 *
 * Defines the module's GUC.
 */
void
_PG_init(void)
{
   DefineCustomStringVariable("basic_archive.archive_directory",
                              gettext_noop("Archive file destination directory."),
                              NULL,
                              &archive_directory,
                              "",
                              PGC_SIGHUP,
                              0,
                              check_archive_directory, NULL, NULL);

   MarkGUCPrefixReserved("basic_archive");
}

/*
 * _PG_archive_module_init
 *
 * Returns the module's archiving callbacks.
 */
const ArchiveModuleCallbacks *
_PG_archive_module_init(void)
{
   return &basic_archive_callbacks;
}

/*
 * basic_archive_startup
 *
 * Creates the module's memory context.
 */
void
basic_archive_startup(ArchiveModuleState *state)
{
   BasicArchiveData *data;

   data = (BasicArchiveData *) MemoryContextAllocZero(TopMemoryContext,
                                                        sizeof(BasicArchiveData));
   data->context = AllocSetContextCreate(TopMemoryContext,
                                         "basic_archive",
                                         ALLOCSET_DEFAULT_SIZES);
   state->private_data = (void *) data;
}

/*
 * check_archive_directory
 *
 * Checks that the provided archive directory exists.
 */
static bool
check_archive_directory(char **newval, void **extra, GucSource source)
{
   struct stat st;

     /*
      * The default value is an empty string, so we have to accept that value.
      * Our check_configured callback also checks for this and prevents
      * archiving from proceeding if it is still empty.
      */
   if (*newval == NULL || *newval[0] == '\0')
       return true;

   /*
    * Make sure the file paths won't be too long.  The docs indicate that the
    * file names to be archived can be up to 64 characters long.
    */
   if (strlen(*newval) + 64 + 2 >= MAXPGPATH)
   {
       GUC_check_errdetail("Archive directory too long.");
       return false;
   }

   /*
    * Do a basic sanity check that the specified archive directory exists. It
    * could be removed at some point in the future, so we still need to be
    * prepared for it not to exist in the actual archiving logic.
    */
   if (stat(*newval, &st) != 0 || !S_ISDIR(st.st_mode))
   {
       GUC_check_errdetail("Specified archive directory does not exist.");
       return false;
   }

   return true;
}

/*
 * basic_archive_configured
 *
 * Checks that archive_directory is not blank.
 */
static bool
basic_archive_configured(ArchiveModuleState *state)
{
   return archive_directory != NULL && archive_directory[0] != '\0';
}

/*
 * basic_archive_file
 *
 * Archives one file.
 */
static bool
basic_archive_file(ArchiveModuleState *state, const char *file, const char *path)
{
   sigjmp_buf  local_sigjmp_buf;
   MemoryContext oldcontext;
   BasicArchiveData *data = (BasicArchiveData *) state->private_data;
   MemoryContext basic_archive_context = data->context;

   /*
    * We run basic_archive_file_internal() in our own memory context so that
    * we can easily reset it during error recovery (thus avoiding memory
    * leaks).
    */
   oldcontext = MemoryContextSwitchTo(basic_archive_context);

   /*
    * Since the archiver operates at the bottom of the exception stack,
    * ERRORs turn into FATALs and cause the archiver process to restart.
    * However, using ereport(ERROR, ...) when there are problems is easy to
    * code and maintain.  Therefore, we create our own exception handler to
    * catch ERRORs and return false instead of restarting the archiver
    * whenever there is a failure.
    */
   if (sigsetjmp(local_sigjmp_buf, 1) != 0)
   {
       /* Since not using PG_TRY, must reset error stack by hand */
       error_context_stack = NULL;

       /* Prevent interrupts while cleaning up */
       HOLD_INTERRUPTS();

       /* Report the error and clear ErrorContext for next time */
       EmitErrorReport();
       FlushErrorState();

       /* Close any files left open by copy_file() or compare_files() */
       AtEOSubXact_Files(false, InvalidSubTransactionId, InvalidSubTransactionId);

       /* Reset our memory context and switch back to the original one */
       MemoryContextSwitchTo(oldcontext);
       MemoryContextReset(basic_archive_context);

       /* Remove our exception handler */
       PG_exception_stack = NULL;

       /* Now we can allow interrupts again */
       RESUME_INTERRUPTS();

       /* Report failure so that the archiver retries this file */
       return false;
   }

   /* Enable our exception handler */
   PG_exception_stack = &local_sigjmp_buf;

   /* Archive the file! */
   basic_archive_file_internal(file, path);

   /* Remove our exception handler */
   PG_exception_stack = NULL;

   /* Reset our memory context and switch back to the original one */
   MemoryContextSwitchTo(oldcontext);
   MemoryContextReset(basic_archive_context);

   return true;
}

static void
basic_archive_file_internal(const char *file, const char *path)
{
   char        destination[MAXPGPATH];
   char        temp[MAXPGPATH + 256];
   struct stat st;
   struct timeval tv;
   uint64      epoch;          /* milliseconds */

   ereport(DEBUG3,
           (errmsg("archiving \"%s\" via basic_archive", file)));

   snprintf(destination, MAXPGPATH, "%s/%s", archive_directory, file);

   /*
    * First, check if the file has already been archived.  If it already
    * exists and has the same contents as the file we're trying to archive,
    * we can return success (after ensuring the file is persisted to disk).
    * This scenario is possible if the server crashed after archiving the
    * file but before renaming its .ready file to .done.
    *
    * If the archive file already exists but has different contents,
    * something might be wrong, so we just fail.
    */
   if (stat(destination, &st) == 0)
   {
       if (compare_files(path, destination))
       {
           ereport(DEBUG3,
                   (errmsg("archive file \"%s\" already exists with identical contents",
                           destination)));

           fsync_fname(destination, false);
           fsync_fname(archive_directory, true);

           return;
       }

       ereport(ERROR,
               (errmsg("archive file \"%s\" already exists", destination)));
   }
   else if (errno != ENOENT)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not stat file \"%s\": %m", destination)));

   /*
    * Pick a sufficiently unique name for the temporary file so that a
    * collision is unlikely.  This helps avoid problems in case a temporary
    * file was left around after a crash or another server happens to be
    * archiving to the same directory.
    */
   gettimeofday(&tv, NULL);
   if (pg_mul_u64_overflow((uint64) 1000, (uint64) tv.tv_sec, &epoch) ||
       pg_add_u64_overflow(epoch, (uint64) (tv.tv_usec / 1000), &epoch))
       elog(ERROR, "could not generate temporary file name for archiving");

   snprintf(temp, sizeof(temp), "%s/%s.%s.%d." UINT64_FORMAT,
            archive_directory, "archtemp", file, MyProcPid, epoch);

   /*
    * Copy the file to its temporary destination.  Note that this will fail
    * if temp already exists.
    */
   copy_file(path, temp);

   /*
    * Sync the temporary file to disk and move it to its final destination.
    * Note that this will overwrite any existing file, but this is only
    * possible if someone else created the file since the stat() above.
    */
   (void) durable_rename(temp, destination, ERROR);

   ereport(DEBUG1,
           (errmsg("archived \"%s\" via basic_archive", file)));
}

/*
 * compare_files
 *
 * Returns whether the contents of the files are the same.
 */
static bool
compare_files(const char *file1, const char *file2)
{
#define CMP_BUF_SIZE (4096)
   char        buf1[CMP_BUF_SIZE];
   char        buf2[CMP_BUF_SIZE];
   int         fd1;
   int         fd2;
   bool        ret = true;

   fd1 = OpenTransientFile(file1, O_RDONLY | PG_BINARY);
   if (fd1 < 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not open file \"%s\": %m", file1)));

   fd2 = OpenTransientFile(file2, O_RDONLY | PG_BINARY);
   if (fd2 < 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not open file \"%s\": %m", file2)));

   for (;;)
   {
       int         nbytes = 0;
       int         buf1_len = 0;
       int         buf2_len = 0;

       while (buf1_len < CMP_BUF_SIZE)
       {
           nbytes = read(fd1, buf1 + buf1_len, CMP_BUF_SIZE - buf1_len);
           if (nbytes < 0)
               ereport(ERROR,
                       (errcode_for_file_access(),
                        errmsg("could not read file \"%s\": %m", file1)));
           else if (nbytes == 0)
               break;

           buf1_len += nbytes;
       }

       while (buf2_len < CMP_BUF_SIZE)
       {
           nbytes = read(fd2, buf2 + buf2_len, CMP_BUF_SIZE - buf2_len);
           if (nbytes < 0)
               ereport(ERROR,
                       (errcode_for_file_access(),
                        errmsg("could not read file \"%s\": %m", file2)));
           else if (nbytes == 0)
               break;

           buf2_len += nbytes;
       }

       if (buf1_len != buf2_len || memcmp(buf1, buf2, buf1_len) != 0)
       {
           ret = false;
           break;
       }
       else if (buf1_len == 0)
           break;
   }

   if (CloseTransientFile(fd1) != 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not close file \"%s\": %m", file1)));

   if (CloseTransientFile(fd2) != 0)
       ereport(ERROR,
               (errcode_for_file_access(),
                errmsg("could not close file \"%s\": %m", file2)));

   return ret;
}

/*
 * basic_archive_shutdown
 *
 * Frees our allocated state.
 */
static void
basic_archive_shutdown(ArchiveModuleState *state)
{
   BasicArchiveData *data = (BasicArchiveData *) state->private_data;
   MemoryContext basic_archive_context;

   /*
    * If we didn't get to storing the pointer to our allocated state, we
    * don't have anything to clean up.
    */
   if (data == NULL)
       return;

   basic_archive_context = data->context;
   Assert(CurrentMemoryContext != basic_archive_context);

   if (MemoryContextIsValid(basic_archive_context))
       MemoryContextDelete(basic_archive_context);
   data->context = NULL;

   /*
    * Finally, free the state.
    */
   pfree(data);
   state->private_data = NULL;
}