Docs

1. Introduction

ReplicaDB is primarily a command line, portable and cross-platform tool for data replication between a source and a sink databases. Its main objective is performance, implementing all the specific DataBase engine techniques to achieve the best performance for each of them, either as a source or as a sink.

ReplicaDB follows the Convention over configuration design, so the user will introduce the minimum parameters necessary for the replication process, the rest will be default.

2. Basic Usage

With ReplicaDB, you can replicate data between relational databases and non-relational databases. The input to the replication process is a database table or custom query. ReplicaDB will read the source table row-by-row and the output of this replication process is q table in the sink database containing a copy of the source table. The replication process is performed in parallel.

By default, ReplicaDB will truncate the sink table before populating it with data, unless --sink-disable-truncate false is indicated.


2.1 Replication Mode

ReplicaDB implements three replication modes: complete, complete-atomic and incremental.

Complete

The complete mode makes a complete replica of the source table, of all its data, from source to sink. In complete mode, only INSERT is done in the sink table without worrying about the primary keys. ReplicaDB will perform the following actions on a complete replication:

  • Truncate the sink table with the TRUNCATE TABLE statement.
  • Select and copy the data in parallel from the source table to the sink table.

So data is not available in the Sink Table during the replication process.

ReplicaDB Mode Complete

Complete Atomic

The complete-atomic mode performs a complete replication (DELETE and INSERT) in a single transaction, allowing the sink table to never be empty. ReplicaDB will perform the following actions on a complete-atomic replication:

  • Automatically create the staging table in the sink database.
  • Begin a new transaction, called "T0", and delete the sink table with the DELETE FROM statement. This operation is executed in a new thread, so ReplicaDB does not wait for the operation to finish.
  • Select and copy the data in parallel from the source table to the sink staging table.
  • Wait until the DELETE statement of transaction "T0" is completed.
  • Using transaction "T0" the data is moved (using INSERT INTO ... SELECT statement) from the sink staging table to the sink table.
  • Commit transaction "T0".
  • Drop the sink staging table.

So data is available in the Sink Table during the replication process.

ReplicaDB Mode Complete Atomic

Incremental

The incremental mode performs an incremental replication of the data from the source table to the sink table. The incremental mode aims to replicate only the new data added in the source table to the sink table. This technique drastically reduces the amount of data that must be moved between both databases and becomes essential with large tables with billions of rows.

To do this, it is necessary to have a strategy for filtering the new data at the source. Usually, a date type column or a unique incremental ID is used. Therefore it will be necessary to use the source.where parameter to retrieve only the newly added rows in the source table since the last time the replica was executed.

Currently, you must store the last value of the column used to determine changes in the source table. In future versions, ReplicaDB will do this automatically.

In the incremental mode, the INSERT or UPDATE or UPSERT technique is used in the sink table. ReplicaDB needs to create a staging table in the sink database, where data is copied in parallel. The last step of the replication is to merge the staging table with the sink table. ReplicaDB will perform the following actions in an incremental replication:

  • Automatically create the staging table in the sink database.
  • Truncate the staging table.
  • Select and copy the data in parallel from the source table to the sink staging table.
  • Gets the primary keys of the sink table
  • Execute the UPSERT sentence between the sink staging table and the sink table. This statement will depend on the Database Vendor, it can be for example INSERT ... ON CONFLICT ... DO UPDATE in PostgreSQL or MERGE INTO ... in Oracle.
  • Drop the sink staging table.

So data is available in the Sink Table during the replication process.

ReplicaDB Mode Incremental


2.2 Controlling Parallelism

ReplicaDB replicates data in parallel from most database sources. You can specify the number of job tasks (parallel processes) to use to perform the replication by using the -j or --jobs argument. Each of these arguments takes an integer value which corresponds to the degree of parallelism to employ. By default, four tasks are used. Some databases may see improved performance by increasing this value to 8 or 16. Do not increase the degree of parallelism beyond what your database can reasonably support. Connecting 100 concurrent clients to your database may increase the load on the database server to a point where performance suffers as a result.

3. Command Line Arguments

ReplicaDB ships with a help tool. To display a list of all available options, type the following command:

$ replicadb --help
usage: replicadb [OPTIONS]
...

Table 1. Common arguments

Argument Description Default
--fetch-size <fetch-size> Number of entries to read from database at once. 100
-h,--help Print this help screen  
-j,--jobs <n> Use n jobs to replicate in parallel. 4
--mode <mode> Specifies the replication mode. The allowed values are complete, complete-atomic or incremental complete
--options-file <file-path> Options file path location  
--quoted-identifiers Should all database identifiers be quoted. false
--sink-columns <col,col,col...> Sink database table columns to be populated --source-columns
--sink-connect <jdbc-uri> Sink database JDBC connect string required
--sink-disable-escape Escape srings before populating to the table of the sink database. false
--sink-disable-truncate Disable the truncation of the sink database table before populate. false
--sink-password <password> Sink database authentication password  
--sink-staging-schema <schema-name> Scheme name on the sink database, with right permissions for creating staging tables. PUBLIC
--sink-staging-table <table-name> Qualified name of the sink staging table. The table must exist in the sink database.  
--sink-staging-table-alias <staging-table-name-alias> Alias name for the sink staging table.  
--sink-table <table-name> Sink database table to populate --source-table
--sink-user <username> Sink database authentication username  
--source-columns <col,col,col...> Source database table columns to be extracted *
--source-connect <jdbc-uri> Source database JDBC connect string required
--source-password <password> Source databse authentication password  
--source-query <statement> SQL statement to be executed in the source database  
--source-table <table-name> Source database table to read  
--source-user <username> Source database authentication username  
--source-where <where clause> Source database WHERE clause to use during extraction  
-v,--verbose Print more information while working  
--version Show implementation version and exit  


3.1 Using Options Files to Pass Arguments

When using ReplicaDB, the command line options that do not change from invocation to invocation can be put in an options file for convenience. An options file is a Java properties text file where each line identifies an option. Option files allow specifying a single option on multiple lines by using the back-slash character at the end of intermediate lines. Also supported are comments within option files that begin with the hash character. Comments must be specified on a new line and may not be mixed with option text. All comments and empty lines are ignored when option files are expanded.

Option files can be specified anywhere on the command line. Command line arguments override those in the options file. To specify an options file, simply create an options file in a convenient location and pass it to the command line via --options-file argument.

For example, the following ReplicaDB invocation for replicate a full table into PostgreSQL can be specified alternatively as shown below:

$ replicadb --source-connect jdbc:postgresql://localhost/osalvador \
--source-table TEST \
--sink-connect jdbc:postgresql://remotehost/testdb \
--sink-user=testusr \
--sink-table TEST \
--mode complete
$ replicadb --options-file /users/osalvador/work/import.txt -j 4

where the options file /users/osalvador/work/import.txt contains the following:

source.connect=jdbc:postgresql://localhost/osalvador
source.table=TEST

sink.connect=jdbc:postgresql://remotehost/testdb
sink.user=testusr
sink.table=TEST

mode=complete


Using environment variables in options file

If you are familiar with Ant or Maven, you have most certainly already encountered the variables (like ${token}) that are automatically expanded when the configuration file is loaded. ReplicaDB supports this feature as well, here is an example:

source.connect=jdbc:postgresql://${PGHOST}$/${PGDATABASE}
source.user=${PGUSER}
source.password=${PGPASSWORD}
source.table=TEST

Variables are interpolated from system properties. ReplicaDB will search for a system property with the given name and replace the variable by its value. This is a very easy means for accessing the values of system properties in the options configuration file.

Note that if a variable cannot be resolved, e.g. because the name is invalid or an unknown prefix is used, it won’t be replaced, but is returned as-is including the dollar sign and the curly braces.


3.2 Connecting to a Database Server

ReplicaDB is designed to replicate tables between databases. To do so, you must specify a connect string that describes how to connect to the database. The connect string is similar to a URL and is communicated to ReplicaDB with the --source-connect or --sink-connect arguments. This describes the server and database to connect to; it may also specify the port. For example:

$ replicadb --source-connect jdbc:mysql://database.example.com/employees

This string will connect to a MySQL database named employees on the host database.example.com.

You might need to authenticate against the database before you can access it. You can use the --source-username or --sink-username to supply a username to the database.

ReplicaDB provides a couple of different ways to supply a password, secure and non-secure, to the database which is detailed below.


Specifying extra JDBC parameters

When connecting to a database using JDBC, you can optionally specify extra JDBC parameters only via the options file. The contents of these properties are parsed as standard Java properties and passed into the driver while creating a connection.

You can specify these parameters for both the source and sink databases. ReplicaDB will retrieve all the parameters that start with source.connect.parameter. or sink.connect.parameter. followed by the name of the specific parameter of the database engine.

Examples:

# Source JDBC connection parameters
# source.connect.parameter.[prameter_name]=parameter_value
# Example for Oracle
source.connect.parameter.oracle.net.tns_admin=${TNS_ADMIN}
source.connect.parameter.oracle.net.networkCompression=on
source.connect.parameter.defaultRowPrefetch=5000
# Sink JDBC connection parameters
# sink.connect.parameter.[prameter_name]=parameter_value
# Example for PostgreSQL
sink.connect.parameter.ApplicationName=ReplicaDB
sink.connect.parameter.reWriteBatchedInserts=true


Secure way of supplying a password to the database

To supply a password securely, the options file must be used using the --options-file argument. For example:

$ replicadb --source-connect jdbc:mysql://database.example.com/employees \
--source-username boss --options-file ./conf/empoloyee.conf

where the options file ./conf/empoloyee.conf contains the following:

source.password=myEmployeePassword

Unsecure way of supplying password to the database

$ replicadb --source-connect jdbc:mysql://database.example.com/employees \
--source-username boss --source-password myEmployeePassword


3.3 Selecting the Data to Replicate

ReplicaDB typically replicates data in a table-centric fashion. Use the --source-table argument to select the table to replicate. For example, --source-table employees. This argument can also identify a VIEW or other table-like entity in a database.

By default, all columns within a table are selected for replication. You can select a subset of columns and control their ordering by using the --source-columns argument. This should include a comma-delimited list of columns to replicate. For example: --source-columns "name,employee_id,jobtitle".

You can control which rows are replicated by adding a SQL WHERE clause to the statement. By default, ReplicaDB generates statements of the form SELECT <column list> FROM <table name>. You can append a WHERE clause to this with the --source-where argument. For example: --source-where "id > 400". Only rows where the id column has a value greater than 400 will be replicated.


3.4 Free-form Query Replications

ReplicaDB can also replicate the result set of an arbitrary SQL query. Instead of using the --source-table, --source-columns and --source-where arguments, you can specify a SQL statement with the --source-query argument.

For example:

$ replicadb --source-query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id)'

4. Notes for specific connectors


4.1 CSV files Connector

The CSV File Connector uses the Apache Commons CSV library to read and write the CSV files.

To define a CSV file, set the source-connect or sink-connect parameter to file:/....

############################# Source Options #############################
# Windows Paths
source.connect=file:/C:/Users/osalvador/Downloads/file.csv
source.connect=file://C:\\Users\\osalvador\\Downloads\\file.csv

# Unix Paths
source.connect=file:///Users/osalvador/Downloads/file.csv

source.columns=id, name, born
source.connect.parameter.columns.types=integer, varchar, date

############################# Sink Options #############################
# Windows Paths
sink.connect=file:/C:/Users/osalvador/Downloads/file.csv
sink.connect=file://C:\\Users\\osalvador\\Downloads\\file.csv

# Unix Paths
sink.connect=file:///Users/osalvador/Downloads/file.csv

By default the format of the CSV File is the DEFAULT predefined format:

    delimiter=,
    quote="
    recordSeparator=\r\n
    ignoreEmptyLines=true


Note: The format is taken as the base format, you can modify any of its attributes by setting the rest parameters.


4.1.1 CSV File as Source

When defining a CSV file as a Source, you should note that the columns.types parameter is required.

This parameter defines the format of the columns in the CSV file. This should include a comma-delimited list of columns data types and the exact number of columns in the CSV file.

For example, you set the parameter with source.connect.parameter.columns.types=integer,varchar,date for a CSV File with 3 columns.


4.1.2 Supported Data Types for CSV file as Source

You can read all columns in the CSV file as VARCHAR and ReplicaDB will store them in a String. But, if you want to make a standard parsing of your data, you should define the correct data type of your column.

CSV supported data Types Mapped to Java Types

CSV Data Type Java Type Parser
VARCHAR String  
CHAR String  
LONGVARCHAR String  
INTEGER int Integer.parseInt()
BIGINT long Long.parseLong()
TINYINT byte Byte.parseByte()
SMALLINT short Short.parseShort()
NUMERIC java.math.BigDecimal new BigDecimal()
DECIMAL java.math.BigDecimal new BigDecimal()
DOUBLE double Double.parseDouble()
FLOAT float Float.parseFloat()
DATE java.sql.Date Date.valueOf()
TIMESTAMP java.sql.Timestamp Timestamp.valueOf()
TIME java.sql.Time Time.valueOf()
BOOLEAN boolean Custom Boolean Parser


Custom Boolean Parser

The boolean returned represents the value true if the string argument is not null and is equal, ignoring case, to the string true, yes, on, 1, t, y.


4.1.3 Predefined CSV Formats

Thanks to Apache Commons CSV you can read or write CSV files in several predefined formats or customize yours with the available parameters.

You can use these predefined formats to read or write CSV files. To define a CSV format, set the format extra parameter to any of these available formats: DEFAULT, EXCEL, INFORMIX_UNLOAD, INFORMIX_UNLOAD_CSV, MONGO_CSV, MONGO_TSV, MYSQL, ORACLE, POSTGRESQL_CSV, POSTGRESQL_TEXT, RFC4180, TDF.

Example

source.connect.parameter.format=RFC4180


DEFAULT

Standard Comma Separated Value format, as for RFC4180 but allowing empty lines.

Settings are:

    delimiter=,
    quote="
    recordSeparator=\r\n
    ignoreEmptyLines=true

EXCEL

The Microsoft Excel CSV format.

Excel file format (using a comma as the value delimiter). Note that the actual value delimiter used by Excel is locale dependent, it might be necessary to customize this format to accommodate to your regional settings.

Settings are:

    delimiter=,
    quote="
    recordSeparator=\r\n
    ignoreEmptyLines=false

INFORMIX_UNLOAD

Informix UNLOAD format used by the UNLOAD TO file_name operation.

This is a comma-delimited format with a LF character as the line separator. Values are not quoted and special characters are escaped with \.

Settings are:

    delimiter=,
    escape=\\
    quote="
    recordSeparator=\n

INFORMIX_UNLOAD_CSV

Informix CSV UNLOAD format used by the UNLOAD TO file_name operation (escaping is disabled.)

This is a comma-delimited format with a LF character as the line separator. Values are not quoted and special characters are escaped with \.

Settings are:

    delimiter=,
    quote="
    recordSeparator=\n

MONGO_CSV

MongoDB CSV format used by the mongoexport operation.

This is a comma-delimited format. Values are double quoted only if needed and special characters are escaped with ". A header line with field names is expected.

Settings are:

    delimiter=,
    escape="
    quote="
    quoteMode=ALL_NON_NULL

MONGODB_TSV

Default MongoDB TSV format used by the mongoexport operation.

This is a tab-delimited format. Values are double quoted only if needed and special characters are escaped with ". A header line with field names is expected.

Settings are:

    delimiter=\t
    escape="
    quote="
    quoteMode=ALL_NON_NULL

MYSQL

Default MySQL format used by the SELECT INTO OUTFILE and LOAD DATA INFILE operations.

This is a tab-delimited format with a LF character as the line separator. Values are not quoted and special characters are escaped with \. The default NULL string is \N.

Settings are:

    delimiter=\t
    escape=\\
    ignoreEmptyLines=false
    quote=null
    recordSeparator=\n
    nullString=\N
    quoteMode=ALL_NON_NULL

ORACLE

Default Oracle format used by the SQL*Loader utility.

This is a comma-delimited format with the system line separator character as the record separator. Values are double quoted when needed and special characters are escaped with ". The default NULL string is "". Values are trimmed.

Settings are:

    delimiter=,
    escape=\\
    ignoreEmptyLines=false    
    quote="
    nullString=\N
    trim=true    
    quoteMode=MINIMAL

POSTGRESQL_CSV

Default PostgreSQL CSV format used by the COPY operation.

This is a comma-delimited format with a LF character as the line separator. Values are double quoted and special characters are escaped with ". The default NULL string is "".

Settings are:

    delimiter=,
    escape="
    ignoreEmptyLines=false
    quote="
    recordSeparator=\n
    nullString=""
    quoteMode=ALL_NON_NULL

POSTGRESQL_TEXT

Default PostgreSQL text format used by the COPY operation.

This is a tab-delimited format with a LF character as the line separator. Values are double quoted and special characters are escaped with ". The default NULL string is \\N.

Settings are:

    delimiter=\t
    escape=\\
    ignoreEmptyLines=false
    quote="
    recordSeparator=\n
    nullString=\\N
    quoteMode=ALL_NON_NULL

RFC4180

The RFC-4180 format defined by RFC-4180.

Settings are:

    delimiter=,
    quote="
    recordSeparator=\r\n
    ignoreEmptyLines=false

TDF

A tab delimited format.

Settings are:

    delimiter=\t
    quote="
    recordSeparator=\r\n
    ignoreSurroundingSpaces=true


4.1.4 Predefined Quote Modes

You can set a predefined quote mode policy when writing a CSV File as a sink. To define a quote mode, set the format.quoteMode extra parameter to any of these available formats: ALL, ALL_NON_NULL, MINIMAL, NON_NUMERIC, NONE.

Name Description
ALL Quotes all fields.
ALL_NON_NULL Quotes all non-null fields.
MINIMAL Quotes fields which contain special characters such as the field delimiter, quote character, or any of the characters in the line separator string.
NON_NUMERIC Quotes all non-numeric fields.
NONE Never quote fields.


4.1.5 Extra parameters

The CSV connector supports the following extra parameters that can only be defined as extra connection parameters in the options-file:

Parameter Description Default
columns.types Sets the columns data types. This parameter is required for Source CSV Files  
format Sets the base predefined CSV format DEFAULT
format.delimiter Sets the field delimiter character ,
format.escape Sets the escape character  
format.quote Sets the quoteChar character "
format.recordSeparator Sets the end-of-line character. This parameter only take effect on Snik CSV Files  
format.firstRecordAsHeader Sets whether the first line of the file should be the header, with the names of the fields false
format.ignoreEmptyLines Sets whether empty lines between records are ignored on Source CSV Files true
format.nullString Sets the nullString character  
format.ignoreSurroundingSpaces Sets whether spaces around values are ignored on Source CSV files  
format.quoteMode Sets the quote policy on Sink CSV files  
format.trim Sets whether to trim leading and trailing blanks  

Complete Example for CSV File as Source and Sink

############################# ReplicadB Basics #############################
mode=complete
jobs=1
############################# Soruce Options #############################
source.connect=file:///Users/osalvador/Downloads/fileSource.txt
source.connect.parameter.columns.types=integer, integer, varchar, time, float, boolean
source.connect.parameter.format=DEFAULT
source.connect.parameter.format.delimiter=|
source.connect.parameter.format.escape=\\
source.connect.parameter.format.quote="
source.connect.parameter.format.recordSeparator=\n
source.connect.parameter.format.firstRecordAsHeader=true
source.connect.parameter.format.ignoreEmptyLines=true
source.connect.parameter.format.nullString=
source.connect.parameter.format.ignoreSurroundingSpaces=true
source.connect.parameter.format.trim=true

############################# Sink Options #############################
sink.connect=file:///Users/osalvador/Downloads/fileSink.csv
sink.connect.parameter.format=RFC4180
sink.connect.parameter.format.delimiter=,
sink.connect.parameter.format.escape=\\
sink.connect.parameter.format.quote="
sink.connect.parameter.format.recordSeparator=\n
sink.connect.parameter.format.nullString=
sink.connect.parameter.format.quoteMode=non_numeric

############################# Other #############################
verbose=true


4.1.6 Replication Mode

Unlike in a database, the replication mode for a CSV file as sink has a slight difference:

  • complete: Create a new file. If the file exists it is overwritten with the new data.
  • incremental: Add the new data to the existing file. If the file does not exist, it creates it.

The firstRecordAsHeader=true parameter is not supported on incremental mode.

Example

############################# ReplicadB Basics #############################
mode=complete
jobs=4

############################# Soruce Options #############################
source.connect=jdbc:oracle:thin:@host:port:sid
source.user=orauser
source.password=orapassword
source.table=schema.table_name

############################# Sink Options #############################
sink.connect=file:///Users/osalvador/Downloads/file.csv


4.2 Oracle Connector

To connect to an Oracle database, either as a source or sink, we must specify a Database URL string. In the Oracle documentation you can review all available options:Oracle Database URLs and Database Specifiers

Remember that in order to specify a connection using a TNSNames alias, you must set the oracle.net.tns_admin property as indicated in the Oracle documentation.

Example

############################# ReplicadB Basics #############################
mode=complete
jobs=4

############################# Soruce Options #############################
source.connect=jdbc:oracle:thin:@MY_DATABASE_SID
source.user=orauser
source.password=orapassword
source.table=schema.table_name

source.connect.parameter.oracle.net.tns_admin=${TNS_ADMIN}
source.connect.parameter.oracle.net.networkCompression=on

############################# Sink Options #############################
...


4.3 PostgreSQL Connector

The PostgreSQL connector uses the SQL COPY command and its implementation in JDBC PostgreSQL COPY bulk data transfer what offers great performance.

The PostgreSQL JDBC driver has much better performance if the network is not included in the data transfer. Therefore, it is recommended that ReplicaDB be executed on the same machine where the PostgreSQL database resides.

In terms of monitoring and control, it is interesting to identify the connection to PostgreSQL setting the property ApplicationName.

Example

source.connect=jdbc:postgresql://host:port/db
source.user=pguser
source.password=pgpassword
source.table=schema.table_name

source.connect.parameter.ApplicationName=ReplicaDB


4.4 Denodo Connector

The Denodo connector only applies as a source, since being a data virtualization, it is normally only used as a source.

To connect to Denodo, you can review the documentation for more information: Access Through JDBC

In terms of monitoring and control, it is interesting to identify the connection to PostgreSQL setting the property userAgent.

Example

source.connect=jdbc:vdb://host:port/db
source.user=vdbuser
source.password=vdbpassword
source.table=schema.table_name

source.connect.parameter.userAgent=ReplicaDB


4.5 Amazon S3 Connector

Amazon Simple Storage Service (Amazon S3) provides secure, durable, highly-scalable object storage. For information about Amazon S3, see Amazon S3.

The s3 protocol is used in a URL that specifies the location of an Amazon S3 bucket and a prefix to use for writing files in the bucket.

S3 URI format: s3://S3_endpoint[:port]/bucket_name/[bucket_subfolder]


Example:

sink.connect=s3://s3.eu-west-3.amazonaws.com/replicadb/images

Connecting to Amazon S3 requires AccessKey and SecretKey provided by your Amazon S3 account. These security keys are specified as additional parameters in the connection.

You can use the AWS S3 connector on any system compatible with their API, such as MinIO or other cloud providers such as Dreamhost, Wasabi, Dell EMC ECS Object Storage


4.5.1 Row Object Creation Type

There are two ways to create objects in Amazon S3 through the connector:

  • Generate a single CSV file for all rows of a source table
  • Generate a binary object for each row of the source table

The behavior is set through the sink.connect.parameter.row.isObject property where it can be true or false.

The purpose of this feature when sink.connect.parameter.row.isObject = true is to be able to extract or replicate LOBs (Large Objects) from sources to single objects in AWS S3. Where for each row of the table the content of the LOB field (BLOB, CLOB, JSON, XMLTYPE, any …) will be the payload of the object in AWS S3.

Similarly, when sink.connect.parameter.row.isObject = false ReplicaDB will generate a single CSV file for all rows in the source table and upload it to AWS S3 in memory streaming, without intermediate files.


4.5.1.1 One Object Per Row

To generate an object for each row of the source table, it is necessary to set the following properties:

# Each row is a different object in s3
sink.connect.parameter.row.isObject=true
sink.connect.parameter.row.keyColumn=[The name of the source table column used as an object key in AWS S3]
sink.connect.parameter.row.contentColumn=[the name of the source table column used as a payload object of the object in AWS S3]

Example:

############################# ReplicadB Basics #############################
mode=complete
jobs=4

############################# Soruce Options #############################
source.connect=jdbc:oracle:thin:@host:port:sid
source.user=orauser
source.password=orapassword
source.table=product_image
source.columns=product_id || '.jpg' as key_column, image as content_column

############################# Sink Options #############################
sink.connect=s3://s3.eu-west-3.amazonaws.com/replicadb/images
sink.connect.parameter.accessKey=ASDFKLJHIOVNROIUNVSD                                 
sink.connect.parameter.secretKey=naBMm7jVRyeE945m1jIIxMomoRM9rMCiEvVBtQe3

# Each row is a different object in s3
sink.connect.parameter.row.isObject=true
sink.connect.parameter.row.keyColumn=key_column
sink.connect.parameter.row.contentColumn=content_column


The following objects will be generated in AWS S3:

AWS S3


4.5.1.2 One CSV For All Rows

To generate a single CSV file for all rows of a source table, it is necessary to set the following properties:

# All rows are only one CSV object in s3
sink.connect.parameter.csv.keyFileName=[the full name of the target file or object key in AWS S3]


The CSV file generated is RFC 4180 compliant whenever you disable the default escape with --sink-disable-scape as argument or on the options-file:

sink.disable.escape=true


IMPORTANT: To support multi-threaded execution and since it is not possible to append content to an existing AWS S3 file, ReplicaDB will generate one file per job, renaming each file with the taskid.

Example:

############################# ReplicadB Basics #############################
mode=complete
jobs=4

############################# Soruce Options #############################
source.connect=jdbc:oracle:thin:@host:port:sid
source.user=orauser
source.password=orapassword
source.table=product_description

############################# Sink Options #############################
sink.connect=s3://s3.eu-west-3.amazonaws.com/replicadb/images
sink.connect.parameter.accessKey=ASDFKLJHIOVNROIUNVSD                                 
sink.connect.parameter.secretKey=naBMm7jVRyeE945m1jIIxMomoRM9rMCiEvVBtQe3

# All rows are only one CSV object in s3
sink.connect.parameter.csv.keyFileName=product_description.csv
sink.disable.escape=true


The following objects will be generated in AWS S3:

AWS S3


4.5.2 Extra parameters

The Amazon S3 connector supports the following extra parameters that can only be defined as extra connection parameters in the options-file:

Parameter Description Default
accessKey AWS S3 Access Key ID to access the S3 bucket Required
secretKey AWS S3 Secret Access Key for the S3 Access Key ID to access the S3 bucket Required
secure-connection Sets if the connection is secure using SSL (HTTPS) true
row.isObject Sets whether each row in the source table is a different object in AWS S3 false
row.keyColumn Sets the name of the column in the source table whose content will be used as objectKey (filename) in AWS S3 Required when the row.isObject = true
row.contentColumn Sets the name of the column in the source table whose content will be the payload of the object in AWS S3 Required when the row.isObject = true
csv.keyFileName Set the name of the target file or object key in AWS S3 Required when the row.isObject = false
csv.FieldSeparator Sets the field separator character ,
csv.TextDelimiter Sets a field enclosing character "
csv.LineDelimiter Sets the end-of-line character \n
csv.AlwaysDelimitText Sets whether the text should always be delimited false
csv.Header Sets whether the first line of the file should be the header, with the names of the fields false


4.6 MySQL and MariaDB Connector

Because the MariaDB JDBC driver is compatible with MySQL, and the MariaDB driver has better performance compared to the MySQL JDBC driver, in ReplicaDB we use only the MariaDB driver for both databases.

This connector uses the SQL LOAD DATA INFILE command and its implementation in JDBC JDBC API Implementation Notes what offers great performance.

ReplicaDB automatically sets these connection properties that are necessary to use the LOAD DATA INFILE command:

  • characterEncoding=UTF-8
  • allowLoadLocalInfile=true
  • rewriteBatchedStatements=true

Example

source.connect=jdbc:mysql://host:port/db
source.user=pguser
source.password=pgpassword
source.table=schema.table_name


4.7 MSSQL Server Connector

ReplicaDB uses the MSSQL Server bulk copy API for load data into the database.

Hence, depending on the source database, some data types are not supported by the MSSQL Server bulk copy, and must be cast during the replication process to avoid the Specification of length or precision 0 is invalid exception.

In addition, to avoid the exception Column xx is invalid. Please check your column mappings it will be necessary to include all source and sink columns.

In this example, 4 columns from table t_source in PostgresSQL are replicated to t_sink in MSSQL Server. The c_numeric and c_decimal columns are converted to text, the binary column is hex encoded and the xml type column is not supported by the bulk copy API, so it is converted to text.

Example

...
############################# Soruce Options ##############################
source.connect=jdbc:postgresql://localhost:5432/postgres
source.user=root
source.password=ReplicaDB_1234
source.table=public.t_source
source.columns=c_numeric::text,\
  c_decimal::text,\
  encode(c_binary_lob, 'hex'),\
  c_xml::text

############################# Sink Options ################################
sink.connect=jdbc:sqlserver://localhost:1433;database=master
sink.user=sa
sink.password=ReplicaDB_1234
sink.table=dbo.t_sink
sink.staging.schema=dbo
sink.columns=c_numeric, c_decimal, c_binary_blob, c_xml


4.8 SQLite Connector

In the SQLite connector the complete-atomic mode is not supported because replicadb performs multiple transactions on the same table and rows and cannot control the transaction isolation level.

Example

...
######################## ReplicadB General Options ########################
mode=complete
jobs=1
############################# Soruce Options ##############################
source.connect=jdbc:postgresql://localhost:5432/postgres
source.user=sa
source.password=root
source.table=t_source
############################# Sink Options ################################
sink.connect=jdbc:sqlite:/tmp/replicadb-sqlite.db
sink.table=main.t_sink
sink.staging.schema=main


4.9 MongoDB Connector

The MongoDB connector in ReplicaDB allows you to replicate data between MongoDB databases and other databases supported by ReplicaDB. The MongoDB connector uses the MongoDB Bulk API to perform the replication.

Configuration To configure the MongoDB connector, you will need to specify the following options in your ReplicaDB configuration file.

Note that source.user, source.password, sink.user and sink.password are not compatible and must be defined in the URI connection string in the source.connect and sink.connect options.

The source.connect and sink.connect URI string follows the MongoDB URI format:

  • mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
  • For MongoDB Atlas: mongodb+srv://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

The database name in the URI connection string is required.

Source Options

Parameter Description
source.connect The MongoDB connection string with username and password. Compatible with MongoDB Atlas mongodb+srv
source.user Not compatible
source.password Not compatible
source.table The name of the collection in the source database to replicate from
source.columns The list of columns to replicate from the source collection ($projection). In JSON format
source.where A query filter to apply to the source collection. in JSON format
source.query An aggregation pipeline (aggregate) to execute on the source collection. In Array JSON format

Sink Options

Parameter Description
sink.connect The MongoDB connection string with username and password. Compatible with MongoDB Atlas mongodb+srv
sink.user Not compatible
sink.password Not compatible
sink.table The name of the collection in the sink database to replicate to

Incremental Replication

The MongoDB connector supports incremental replication using the source.query or source.where options. To enable incremental replication, you will need to specify a source.query aggregation pipeline that includes a $match stage to filter the documents to be replicated or a source.where query filter.

The incremental mode for MongoDB as sink database uses the $merge statement to update the documents in the sink collection. The $merge statement requires a unique index on some field in the sink collection. If the sink collection does not have a unique index, ReplicaDB will throw an exception.

Note that the $merge stage is supported since MongoDB 4.2 and later.

For example, to replicate all documents in the source_collection with a timestamp field greater than or equal to the current time, you could use the following configuration:

Using source.query

mode=incremental
source.query=[{$match: {timestamp: {$gte: new Date()}}},{$sort: {timestamp: 1}}]
source.table=source_collection
sink.table=sink_collection

Using source.where

mode=incremental
source.where={timestamp: {$gte: new Date()}}
source.table=source_collection
sink.table=sink_collection

Examples

1.Replicate all documents from a collection in local MongoDB instance to a collection in remote MongoDB Atlas instance:

######################## ReplicadB General Options ########################
mode=complete
jobs=1
fetch.size=100
############################# Source Options ##############################
source.connect=mongodb://root:password@127.0.0.1/catalog
source.table=products
############################# Sink Options ################################
sink.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog?retryWrites=true&w=majority
sink.table=products

2.Replicate a subset of documents from a collection in a local mongodb instance to a collection in a remote mongodb instance, using the source.query option to specify a mongodb aggregation pipeline:

######################## ReplicadB General Options ########################
mode=complete
jobs=1
fetch.size=100
############################# Source Options ##############################
source.connect=mongodb://root:password@127.0.0.1/catalog
source.table=products
source.query=[{ $match : {startDate:{$gte:ISODate("2021-01-01T00:00:00.000Z")}} } \
  ,{$sort:{startDate:1}} \
  ,{$project: {_id:0, shopId:1, productId:1, price:1, startDate:1} } ]
############################# Sink Options ################################
sink.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog?retryWrites=true&w=majority
sink.table=products

3.Replicate a table from a local Postgres database to a collection in a remote MongoDB Atlas instance:

######################## ReplicadB General Options ########################
mode=complete
jobs=1
fetch.size=100
verbose=true
############################# Source Options ##############################
source.connect=jdbc:postgresql://localhost:5432/postgres
source.user=user
source.password=password
source.table=t_roles
# Rename source columns to match mongodb collection sink fields
source.columns=id as "rolId", txt_code as code, txt_role_name as "roleName" \
  , cod_v_usuario_creacion as "createdBy", fec_dt_creacion as "createdAt" \
  , cod_v_usuario_modificacion as "updatedBy", fec_dt_modificacion as "updatedAt"
############################# Sink Options ################################
sink.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog?retryWrites=true&w=majority
sink.table=roles

4.Replicate a collection from a remote MongoDB Atlas instance to a local Postgres database.

It’s mandatory to specify the source.columns projection and sink.columns options to match the source collection fields to the sink table columns.

######################## ReplicaDB General Options ########################
mode=complete
jobs=1
fetch.size=100
verbose=true
############################# Source Options ##############################
source.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog
source.table=roles
source.columns={_id:0,rolId:1,roleName:1,createdBy:1,createdAt:1,updatedBy:1,updatedAt:1}
source.where={createdAt:{$gte:ISODate("2021-01-01T00:00:00.000Z")}}
############################# Sink Options ################################
sink.connect=jdbc:postgresql://localhost:5432/postgres
sink.user=user
sink.password=password
sink.table=t_roles
sink.columns=id, txt_role_name, cod_v_usuario_creacion, fec_dt_creacion, cod_v_usuario_modificacion, fec_dt_modificacion

5.Replicate a collection from a remote MongoDB Atlas instance to a local Postgres database using the Postgres jsonb data type and projection some fields from the source collection.

It’s mandatory to specify the source.columns projection and sink.columns options to match the source collection fields to the sink table columns.

######################## ReplicaDB General Options ########################
mode=complete
jobs=1
fetch.size=100
verbose=true
############################# Source Options ##############################
source.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog
source.table=products
source.columns={_id:0,sku:1, document:'$$ROOT'}
############################# Sink Options ################################
sink.connect=jdbc:postgresql://localhost:5432/postgres
sink.user=user
sink.password=password
sink.table=products_jsonb
sink.columns=sku, document

You can also replicate from Postgres jsonb to MongoDB using the jsonb data type. ReplicaDB will automatically convert the jsonb data type to a MongoDB document.