Example of Using QDL

General

QDL (Quantum Data Loader) is an additional module that is not included in the basic QHB distribution and is installed separately. This utility allows you to generate a table file in multithreaded mode based on a specified description of the table structure and the corresponding data in a CSV file, bypassing the usual database mechanisms. The resulting file can be copied as a table file to the database directory.

See Chapter QDL Data Bulk Direct Loading Module for details about the module.



Loading Data into a New Partition of an Existing Table

Let's say we have a table partitioned by date ranges. Let's say each range covers one year. As a simplified example, let's take a table that has three fields of types timestamp, int, and text. Let's say we currently have two partitions with data for 2019 and 2020, and we need to load data for 2021 as quickly as possible. Let's do this using QDL.


Step 1. Generate Test Table Data

-- Partitioned table qdl_test
DROP TABLE IF EXISTS qdl_test;
CREATE TABLE qdl_test (
    timestamp_value timestamp NOT NULL,
    int_value       int NOT NULL,
    text_value      text
) PARTITION BY RANGE (timestamp_value);

-- Test data for 2019
DROP TABLE IF EXISTS qdl_test_y2019;
CREATE TABLE qdl_test_y2019 PARTITION OF qdl_test
   FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');

INSERT INTO qdl_test_y2019
SELECT to_timestamp(extract('epoch' FROM to_timestamp('2019-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value,
       round(random()*1000000)::int AS int_value,
       substr(md5(random()::text), 0, 30) AS text_value
  FROM generate_series(1,1000);    

-- Test data for 2020
DROP TABLE IF EXISTS qdl_test_y2020;
CREATE TABLE qdl_test_y2020 PARTITION OF qdl_test
   FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');

INSERT INTO qdl_test_y2020
SELECT to_timestamp(extract('epoch' FROM to_timestamp('2020-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value,
       round(random()*1000000)::int AS int_value,
       substr(md5(random()::text), 0, 30) AS text_value
  FROM generate_series(1,1000);

Step 2. Generate Data for Loading as a CSV File

For testing purposes, we generate data for 2021 in the file /tmp/qdl_data.csv, using the capabilities of the COPY command. As a delimiter, we choose a semicolon (;). We need to use the same character in the QDL configuration file.

Note
When loading data into QDL, it is assumed that the first string of the data file contains a header with field names, therefore you must add parameter header true to COPY command while loading into CSV. Otherwise, instead of the header, the first data string will be skipped, which is unacceptable.

COPY (SELECT to_timestamp(extract('epoch' FROM to_timestamp('2021-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value
           , round(random()*1000000) AS int_value
           , substr(md5(random()::text), 0, 30) AS text_value
        FROM generate_series(1,1000))
  TO '/tmp/qdl_data.csv'
  WITH (format csv, delimiter ';', header true);

Step 3. Create a Configuration File Containing the Loaded Table Description for QDL

Note that at the first step the oid parameter in the output section does not matter, so its value can be left zero. The delimiter, the table and its columns description (parameters of the partition table) are important here. If the column value can be NULL, the keyword [nullable] must be added after the type name.

The list of data types supported during loading is provided in Section Supported Field Types.

In this example, /tmp/config.yml configuration file looks like this:

# System configuration:
general:
    # The number of threads allocated for parsing and serializing strings.
    # Note that in addition to these threads, two more are always allocated:
    # a thread for reading and a thread for writing to the file.
    threads: 2
    # Enable checksums calculating
    checksum: true

# Raw data configuration:
data_source:
    csv:
        # Column delimiter for CSV:
        delimiter: ";"

# Output data configuration:
output:
    # Object ID. Specifies an integer ID for the group of files in the target table.
    # Does not matter until the table creation step is performed
    oid: 0
    # Table ID of the table for storing data bulks
    toast_oid: 0

# Target table configuration:
table:
    # Table name. It is required for generate_sql, to specify the name of the table to be created.
    name: "qdl_test_y2021"
    # Columns in the "name": "type" format
    fields:
        timestamp_value : timestamp
        int_value : integer
        text_value : text [nullable]

Step 4. Checking the Integrity of the Table Description and Data in the CSV File

This is an optional step, but it is better to do it so as not to lose the results of creating the table file if the data format in some row is invalid. In this case, the data loading should be started from the very beginning.

The following command checks the logical integrity of the CSV file and the consistency of the data structure with the configuration file, reporting about errors with the appropriate exit status.

/usr/bin/qdl --config /tmp/config.yml --data /tmp/qdl_data.csv validate

Step 5. Getting and Using a Script to Create a Table and Display the OID Value

/usr/bin/qdl --config /tmp/config.yml --data /tmp/qdl_data.csv create_table > /tmp/table_script.sql

The command output looks like this:

CREATE TABLE qdl_test_y2021("timestamp_value" timestamp NOT NULL, "int_value" integer NOT NULL, "text_value" text);
CHECKPOINT;
-- Main table location
SELECT pg_relation_filepath('qdl_test_y2021');
-- TOAST table location
SELECT pg_relation_filepath('pg_toast.pg_toast_' || (SELECT oid FROM pg_class WHERE relname = 'qdl_test_y2021'));

The script must be executed in the database.

The SELECT queries display the relative path to the database cluster directory. The first one returns the path to the main table. The second one returns the path to the corresponding TOAST table, if any.

You can exam the directory path using the command:

SHOW data_directory;

Step 6. Generating a Table File

This is the main and most demanding operation. It is important to choose an effective value for the number of parallel threads (threads parameter in the general section) involved in generating a table file. This value will depend on the number of processors on the machine used, as well as on the disk system capabilities. A relatively large threads value can increase competition for disks, and the possible benefit can be negate.

The table OID value must be specified in the /tmp/config.yml file as the value of the oid parameter in the output section. Let's say that in the given script, the pg_relation_filepath function returns the value base/13676/16463 as a result of the first SELECT query, then for the table OID we will specify the value 16463. The name of the resulting table file will match to this parameter. Similarly, we specify toast_oid (TOAST table OID), if it was returned.

As a result of the utility running, files with the specified OID as a name will appear in the directory specified in the --out-dir parameter; in the given example, it will be the 16463 file. If the data size is more than 1 GB, additional files with the corresponding suffixes (16463.1, 16463.2, etc.) will be created.

/usr/bin/qdl --config /tmp/config.yml --data /tmp/qdl_data.csv insert_values --out-dir /tmp

Step 7. Copying the Created File to the Database Directory Using OS Facilities

If the data processing occurs directly on the database server, the copy command might look like this:

cp /tmp/16463 <path_to_data_directory>/base/13676/16463

Step 8. Resetting the Table Data Cache and Some Auxiliary Actions

You need to execute the following command:

SELECT qhb_attach_qdl_data('qdl_test_y2021');

This will reset the table data cache, reconnect the table data files, reindex the corresponding TOAST table (if any), and register the data for the CDC (Change Data Capture) feature.


Step 9. Validating Table Data

You can ensure that the data is available on both the primary and standby servers.

SELECT * FROM qdl_test_y2021 LIMIT 10;
SELECT count(*) FROM qdl_test_y2021; -- if desired, you can see that the rows number in the file matches

Now you can build the required indexes, if needed.

WARNING!
Note that loading data bypassing the standard mechanisms of the database will cause the data in the tables loaded via QDL not be restored when restoring the database from a backup and when using archive logs. When performing a bulk data load via QDL, it is recommended to immediately perform a full backup of the database. An alternative option would be to use incremental backup, where changes in the loaded data are automatically committed.


Step 10. Adding Data in a Partitioned Table

Let's make the qdl_test_y2021 table part of the qdl_test table by turning it into a partition and adding date range constraints.

ALTER TABLE qdl_test ATTACH PARTITION qdl_test_y2021
  FOR VALUES FROM ('2021-01-01') TO ('2022-01-01');

In this case, the standby will already have a prepared table qdl_test_y2021, and the command will be executed successfully.

Now in query for 2021 data from the table qdl_test a call to the new partition is used.

qhb=# explain select count(*) from qdl_test where timestamp_value >= to_timestamp('2021-01-01','YYYY-MM_DD');
                                           QUERY PLAN                                            
-------------------------------------------------------------------------------------------------
 Aggregate  (cost=79.79..79.80 rows=1 width=8)
   ->  Append  (cost=0.00..78.84 rows=379 width=0)
         Subplans Removed: 2
         ->  Seq Scan on qdl_test_y2021  (cost=0.00..26.95 rows=377 width=0)
               Filter: (timestamp_value >= to_timestamp('2021-01-01'::text, 'YYYY-MM_DD'::text))
(5 rows)

The query plan shows an access to the partition qdl_test_y2021.


Notes on Physical Replication

Using physical replication, you need to keep in mind that the data tables on the primary server are created without generating write-ahead logs. Data copied as a completed file will not automatically be replicated to the standby servers. Therefore, you also should copy the table file to the appropriate directory on the standby servers. If you don't do this, the standby servers will only receive the empty table created in Step 5.

Also note that replication may be delayed, either due to switching of write-ahead log files (if streaming replication is not used), or due to a specially configured time lag of the standby, which is used to prevent accidental data loss on the primary.

You must keep in mind the specifics of changes on physical standby servers in the following steps:

Step 5. The script commands will be executed on the standby servers automatically, possibly with some delay.
Step 7. You need to copy the prepared table file to the corresponding data directory on the standby servers as well. This step on the standby servers must be performed after the tables have been created on them.
Step 8. The above mentioned data reset query must be executed on the standby servers as well.
Step 9. Index replication will be performed automatically. However, it is essential that by the time index replication is completed on the standby servers, all previous steps (5, 7 and 8) have been completed and the table is ready for reading data via index access.
Step 10. The script will be executed on the standby servers automatically, possibly with some delay. You may want to verify the successful execution of the operation on the standby servers.



Example of Using QDL in Case of Logical Table Replication

You often need to load data bulks into a new table, and changes in this table should be send via logical replication to a separate data storage. QDL can be successfully used in this situation as well. In this case, a simple example of a regular table is given, but it demonstrates all the specific aspects.


Preamble

In case of logical replication, the terms "primary" and "standby" are used relative to the replicated table. Logical replication implies the presence of a copy of the replicated table on the standby server. When logical replication is initialized, all table data on the primary is copied altogether using standard built-in tools. To avoid copying the entire loaded data bulk from the primary to the standby, you can initialize logical replication with empty tables, then replacing the contents of the tables on both the primary and standby servers. On the standby, the table will generally have its own OID, so the data file will need to be renamed accordingly when copying. Then you need to reset the cached table data on both the primary and standby servers. Then the update replication will be performed in the usual manner. Of course, the table on the primary should not be updated until the work on setting up replication and copying data is finished.

A step-by-step example of loading and setting up logical replication of a table using QDL is provided below.


Step 1. Creating a CSV File with Data for Testing

For testing purposes, we generate data for 2021 in the file /tmp/qdl_log_repl.csv, using the capabilities of the COPY command. As a delimiter, we choose a semicolon (;). We need to use the same character in the QDL configuration file.

Note
When loading data into QDL, it is assumed that the first string of the data file contains a header with field names, therefore you must add parameter header true to COPY command while loading into CSV. Otherwise, instead of the header, the first data string will be skipped, which is unacceptable.

We use the same script as in the previous example.

COPY (SELECT to_timestamp(extract('epoch' FROM to_timestamp('2021-01-01','yyyy-mm-dd')) + random()*60*60*24*365)::timestamp AS timestamp_value
           , round(random()*1000000) AS int_value
           , substr(md5(random()::text), 0, 30) AS text_value
        FROM generate_series(1,1000))
  TO '/tmp/qdl_log_repl.csv'
  WITH (format csv, delimiter ';', header true);

Step 2. Creating Configuration File /tmp/config_qdl_log_repl.yml Containing the Loaded Table Description for QDL

Note that at the first step the oid parameter in the output section does not matter, so its value can be left zero. The delimiter, the table and its columns description (parameters of the partition table) are important here. If the column value can be NULL, the keyword [nullable] must be added after the type name.

The list of data types supported during loading is provided in Section Supported Field Types.

In this example, /tmp/config_qdl_log_repl.yml configuration file looks like this:

# System configuration:
general:
    # The number of threads allocated for parsing and serializing strings.
    # Note that in addition to these threads, two more are always allocated:
    # a thread for reading and a thread for writing to the file.
    threads: 2
    # Enable checksums calculating
    checksum: true

# Raw data configuration:
data_source:
    csv:
        # Column delimiter for CSV:
        delimiter: ";"

# Output data configuration:
output:
    # Object ID. Specifies an integer ID for the group of files in the target table.
    # Does not matter until the table creation step is performed
    oid: 0
    # Table ID of the table for storing data bulks
    toast_oid: 0

# Target table configuration:
table:
    # Table name. It is required for generate_sql, to specify the name of the table to be created.
    name: "qdl_log_repl"
    # Columns in the "name": "type" format
    fields:
        timestamp_value : timestamp
        int_value : integer
        text_value : text [nullable]

Step 3. Checking the Integrity of the Table Description and Data in the CSV File

This is an optional step, but it is better to do it so as not to lose the results of creating the table file if the data format in some row is invalid. In this case, the data loading should be started from the very beginning.

The following command checks the logical integrity of the CSV file and the consistency of the data structure with the configuration file, reporting about errors with the appropriate exit status.

/usr/bin/qdl --config /tmp/config_qdl_log_repl.yml --data /tmp/qdl_data.csv validate

Step 4. Getting and Using a Script to Create a Table and Display the OID Value

/usr/bin/qdl --config /tmp/config_qdl_log_repl.yml --data /tmp/qdl_log_repl.csv create_table > /tmp/table_script.sql

The command output looks like this:

CREATE TABLE qdl_log_repl("timestamp_value" timestamp NOT NULL, "int_value" integer NOT NULL, "text_value" text);
CHECKPOINT;
-- Main table location
SELECT pg_relation_filepath('qdl_test_y2021');
-- TOAST table location
SELECT pg_relation_filepath('pg_toast.pg_toast_' || (SELECT oid FROM pg_class WHERE relname = 'qdl_log_repl'));

The script must be executed on both the primary and standby servers.

The first SELECT query of the script will display the path to the table relative to the database cluster directory. Note that each database will display its own values ​​of the pg_relation_filepath function with the table OID number at the end. You need to use this number in the oid parameter of the output section of the QDL configuration file.

At the same step, you should delete all rows from the qdl_log_repl table on the primary and standby servers:

DELETE FROM qdl_log_repl;

When replication is initialized, table rows are copied from the primary to the standby; in this case, this step will effectively be skipped.


Step 5. Setting up Logical Replication

Note that for logical replication to run successfully, the wal_level parameter on the primary must be set to logical.

Run the following script on the primary:

SELECT pg_create_logical_replication_slot('test_slot','pgoutput');
CREATE PUBLICATION test_pub FOR TABLE qdl_log_repl;

On the standby, replace <username> in the following command with a valid username in the primary server database and run it.

CREATE SUBSCRIPTION test_sub CONNECTION 'port=5432 user=<username> dbname=qhb' PUBLICATION test_pub WITH (create_slot = false, slot_name = test_slot);

Step 6. Generating a Table File

This is the main and most demanding operation. It is important to choose an effective value for the number of parallel threads (threads parameter in the general section) involved in generating the table file. This value will depend on the number of processors on the machine used, as well as on the disk system capabilities. A relatively large threads value can increase competition for disks, and the possible benefit can be negate.

The table OID value must be specified in the /tmp/config_qdl_log_repl.yml file as the value of the oid parameter in the output section. Let's say that in the given script, as a result of the last query, the pg_relation_filepath function returned the value base/13676/17101 on the primary, and base/13676/18203 on the standby. Then for the primary, we will specify the value 17101 as the oid parameter. The name of the resulting table file will match to this parameter.

The value on the standby will be different. When copying, you will need to rename the file so that the name matches the OID number on the standby. If you do not specify the correct value, at best the file will simply be redundant, and at worst you can accidentally overwrite an existing table with an OID number that matches the OID number of the table on the primary.

Then you need to run the following script:

/usr/bin/qdl --config /tmp/config.yml --data /tmp/qdl_data.csv insert_values --out-dir /tmp

As a result of the utility running, a file with the specified OID as a name will appear in the directory specified in the --out-dir parameter; in the given example, this will be the 17101 file. If the data size is more than 1 GB, additional files with the corresponding suffixes (17101.1, 17101.2, etc.) will be created.


Step 7. Copying the Created File to the Database Directory on the Primary and Standby Servers Using OS Facilities

On the primary, the copy command might look like this:

cp /tmp/16463 <path to data directory on primary>/base/13676/17101

On the standby the file name is different (the database OID may also be different, but is not changed in this example):

cp /tmp/16463 <path to data directory on standby>/base/13676/18203

Step 8. Resetting the Table Data Cache and Some Auxiliary Actions

You need to run the following command:

SELECT qhb_attach_qdl_data('qdl_test_y2021');

This will reset the table data cache, reconnect the table data files, reindex the corresponding TOAST table (if any), and register the data for the CDC feature.


Step 9. Validating Table Data

You can ensure that the data is available on both the primary and standby servers.

SELECT * FROM qdl_log_repl limit 10;
SELECT count(*) FROM qdl_log_repl; -- if desired, you can see that the rows number in the file matches

Now you can build the required indexes, if needed.

WARNING!
Note that loading data bypassing the standard mechanisms of the database will cause the data in the tables loaded via QDL not be restored when restoring the database from a backup and when using archive logs. When performing a bulk data load via QDL, it is recommended to immediately perform a full backup of the database. An alternative option would be to use incremental backup, where changes in the loaded data are automatically committed.



Conclusion

Using QDL, you can efficiently load data bulks into the database, including the case of physical or logical replication. Testing shows that the total time of loading data using QDL and the time of copying the resulting file is 3.5 times less than the duration of loading the same data using the COPY command.

The approach presented can be used in data storages and marts, where the time to load new data is a critical indicator, and the data usually comes from other sources.