Docs
- Docs
- 1. Introduction
- 2. Basic Usage
- 3. Command Line Arguments
- 4. Notes for specific connectors
1. Introduction
ReplicaDB is primarily a command line, portable and cross-platform tool for data replication between a source and a sink databases. Its main objective is performance, implementing all the specific DataBase engine techniques to achieve the best performance for each of them, either as a source or as a sink.
ReplicaDB follows the Convention over configuration design, so the user will introduce the minimum parameters necessary for the replication process, the rest will be default.
2. Basic Usage
With ReplicaDB, you can replicate data between relational databases and non-relational databases. The input to the replication process is a database table or custom query. ReplicaDB will read the source table row-by-row and the output of this replication process is q table in the sink database containing a copy of the source table. The replication process is performed in parallel.
By default, ReplicaDB will truncate the sink table before populating it with data, unless --sink-disable-truncate false
is indicated.
2.1 Replication Mode
ReplicaDB implements three replication modes: complete
, complete-atomic
and incremental
.
Complete
The complete
mode makes a complete replica of the source table, of all its data, from source to sink. In complete
mode, only INSERT
is done in the sink table without worrying about the primary keys. ReplicaDB will perform the following actions on a complete
replication:
- Truncate the sink table with the
TRUNCATE TABLE
statement. - Select and copy the data in parallel from the source table to the sink table.
So data is not available in the Sink Table during the replication process.
Complete Atomic
The complete-atomic
mode performs a complete replication (DELETE
and INSERT
) in a single transaction, allowing the sink table to never be empty. ReplicaDB will perform the following actions on a complete-atomic
replication:
- Automatically create the staging table in the sink database.
- Begin a new transaction, called
"T0"
, and delete the sink table with theDELETE FROM
statement. This operation is executed in a new thread, so ReplicaDB does not wait for the operation to finish. - Select and copy the data in parallel from the source table to the sink staging table.
- Wait until the
DELETE
statement of transaction"T0"
is completed. - Using transaction
"T0"
the data is moved (usingINSERT INTO ... SELECT
statement) from the sink staging table to the sink table. - Commit transaction
"T0"
. - Drop the sink staging table.
So data is available in the Sink Table during the replication process.
Incremental
The incremental
mode performs an incremental replication of the data from the source table to the sink table. The incremental
mode aims to replicate only the new data added in the source table to the sink table. This technique drastically reduces the amount of data that must be moved between both databases and becomes essential with large tables with billions of rows.
To do this, it is necessary to have a strategy for filtering the new data at the source. Usually, a date type column or a unique incremental ID is used. Therefore it will be necessary to use the source.where
parameter to retrieve only the newly added rows in the source table since the last time the replica was executed.
Currently, you must store the last value of the column used to determine changes in the source table. In future versions, ReplicaDB will do this automatically.
In the incremental
mode, the INSERT or UPDATE
or UPSERT
technique is used in the sink table. ReplicaDB needs to create a staging table in the sink database, where data is copied in parallel. The last step of the replication is to merge the staging table with the sink table. ReplicaDB will perform the following actions in an incremental
replication:
- Automatically create the staging table in the sink database.
- Truncate the staging table.
- Select and copy the data in parallel from the source table to the sink staging table.
- Gets the primary keys of the sink table
- Execute the
UPSERT
sentence between the sink staging table and the sink table. This statement will depend on the Database Vendor, it can be for exampleINSERT ... ON CONFLICT ... DO UPDATE
in PostgreSQL orMERGE INTO ...
in Oracle. - Drop the sink staging table.
So data is available in the Sink Table during the replication process.
2.2 Controlling Parallelism
ReplicaDB replicates data in parallel from most database sources. You can specify the number of job tasks (parallel processes) to use to perform the replication by using the -j
or --jobs
argument. Each of these arguments takes an integer value which corresponds to the degree of parallelism to employ. By default, four tasks are used. Some databases may see improved performance by increasing this value to 8 or 16. Do not increase the degree of parallelism beyond what your database can reasonably support. Connecting 100 concurrent clients to your database may increase the load on the database server to a point where performance suffers as a result.
3. Command Line Arguments
ReplicaDB ships with a help tool. To display a list of all available options, type the following command:
$ replicadb --help
usage: replicadb [OPTIONS]
...
Table 1. Common arguments
Argument | Description | Default |
---|---|---|
--fetch-size <fetch-size> |
Number of entries to read from database at once. | 100 |
-h ,--help |
Print this help screen | |
-j ,--jobs <n> |
Use n jobs to replicate in parallel. | 4 |
--mode <mode> |
Specifies the replication mode. The allowed values are complete , complete-atomic or incremental |
complete |
--options-file <file-path> |
Options file path location | |
--quoted-identifiers |
Should all database identifiers be quoted. | false |
--sink-columns <col,col,col...> |
Sink database table columns to be populated | --source-columns |
--sink-connect <jdbc-uri> |
Sink database JDBC connect string | required |
--sink-disable-escape |
Escape srings before populating to the table of the sink database. | false |
--sink-disable-truncate |
Disable the truncation of the sink database table before populate. | false |
--sink-password <password> |
Sink database authentication password | |
--sink-staging-schema <schema-name> |
Scheme name on the sink database, with right permissions for creating staging tables. | PUBLIC |
--sink-staging-table <table-name> |
Qualified name of the sink staging table. The table must exist in the sink database. | |
--sink-staging-table-alias <staging-table-name-alias> |
Alias name for the sink staging table. | |
--sink-table <table-name> |
Sink database table to populate | --source-table |
--sink-user <username> |
Sink database authentication username | |
--source-columns <col,col,col...> |
Source database table columns to be extracted | * |
--source-connect <jdbc-uri> |
Source database JDBC connect string | required |
--source-password <password> |
Source databse authentication password | |
--source-query <statement> |
SQL statement to be executed in the source database | |
--source-table <table-name> |
Source database table to read | |
--source-user <username> |
Source database authentication username | |
--source-where <where clause> |
Source database WHERE clause to use during extraction | |
-v ,--verbose |
Print more information while working | |
--version |
Show implementation version and exit |
3.1 Using Options Files to Pass Arguments
When using ReplicaDB, the command line options that do not change from invocation to invocation can be put in an options file for convenience. An options file is a Java properties text file where each line identifies an option. Option files allow specifying a single option on multiple lines by using the back-slash character at the end of intermediate lines. Also supported are comments within option files that begin with the hash character. Comments must be specified on a new line and may not be mixed with option text. All comments and empty lines are ignored when option files are expanded.
Option files can be specified anywhere on the command line. Command line arguments override those in the options file. To specify an options file, simply create an options file in a convenient location and pass it to the command line via --options-file
argument.
For example, the following ReplicaDB invocation for replicate a full table into PostgreSQL can be specified alternatively as shown below:
$ replicadb --source-connect jdbc:postgresql://localhost/osalvador \
--source-table TEST \
--sink-connect jdbc:postgresql://remotehost/testdb \
--sink-user=testusr \
--sink-table TEST \
--mode complete
$ replicadb --options-file /users/osalvador/work/import.txt -j 4
where the options file /users/osalvador/work/import.txt
contains the following:
source.connect=jdbc:postgresql://localhost/osalvador
source.table=TEST
sink.connect=jdbc:postgresql://remotehost/testdb
sink.user=testusr
sink.table=TEST
mode=complete
Using environment variables in options file
If you are familiar with Ant or Maven, you have most certainly already encountered the variables (like ${token}
) that are automatically expanded when the configuration file is loaded. ReplicaDB supports this feature as well, here is an example:
source.connect=jdbc:postgresql://${PGHOST}$/${PGDATABASE}
source.user=${PGUSER}
source.password=${PGPASSWORD}
source.table=TEST
Variables are interpolated from system properties. ReplicaDB will search for a system property with the given name and replace the variable by its value. This is a very easy means for accessing the values of system properties in the options configuration file.
Note that if a variable cannot be resolved, e.g. because the name is invalid or an unknown prefix is used, it won’t be replaced, but is returned as-is including the dollar sign and the curly braces.
3.2 Connecting to a Database Server
ReplicaDB is designed to replicate tables between databases. To do so, you must specify a connect string that describes how to connect to the database. The connect string is similar to a URL and is communicated to ReplicaDB with the --source-connect
or --sink-connect
arguments. This describes the server and database to connect to; it may also specify the port. For example:
$ replicadb --source-connect jdbc:mysql://database.example.com/employees
This string will connect to a MySQL database named employees
on the host database.example.com
.
You might need to authenticate against the database before you can access it. You can use the --source-username
or --sink-username
to supply a username to the database.
ReplicaDB provides a couple of different ways to supply a password, secure and non-secure, to the database which is detailed below.
Specifying extra JDBC parameters
When connecting to a database using JDBC, you can optionally specify extra JDBC parameters only via the options file. The contents of these properties are parsed as standard Java properties and passed into the driver while creating a connection.
You can specify these parameters for both the source and sink databases. ReplicaDB will retrieve all the parameters that start with source.connect.parameter.
or sink.connect.parameter.
followed by the name of the specific parameter of the database engine.
Examples:
# Source JDBC connection parameters
# source.connect.parameter.[prameter_name]=parameter_value
# Example for Oracle
source.connect.parameter.oracle.net.tns_admin=${TNS_ADMIN}
source.connect.parameter.oracle.net.networkCompression=on
source.connect.parameter.defaultRowPrefetch=5000
# Sink JDBC connection parameters
# sink.connect.parameter.[prameter_name]=parameter_value
# Example for PostgreSQL
sink.connect.parameter.ApplicationName=ReplicaDB
sink.connect.parameter.reWriteBatchedInserts=true
Secure way of supplying a password to the database
To supply a password securely, the options file must be used using the --options-file
argument. For example:
$ replicadb --source-connect jdbc:mysql://database.example.com/employees \
--source-username boss --options-file ./conf/empoloyee.conf
where the options file ./conf/empoloyee.conf
contains the following:
source.password=myEmployeePassword
Unsecure way of supplying password to the database
$ replicadb --source-connect jdbc:mysql://database.example.com/employees \
--source-username boss --source-password myEmployeePassword
3.3 Selecting the Data to Replicate
ReplicaDB typically replicates data in a table-centric fashion. Use the --source-table
argument to select the table to replicate. For example, --source-table employees
. This argument can also identify a VIEW
or other table-like entity in a database.
By default, all columns within a table are selected for replication. You can select a subset of columns and control their ordering by using the --source-columns
argument. This should include a comma-delimited list of columns to replicate. For example: --source-columns "name,employee_id,jobtitle"
.
You can control which rows are replicated by adding a SQL WHERE
clause to the statement. By default, ReplicaDB generates statements of the form SELECT <column list> FROM <table name>
. You can append a WHERE
clause to this with the --source-where
argument. For example: --source-where "id > 400"
. Only rows where the id
column has a value greater than 400 will be replicated.
3.4 Free-form Query Replications
ReplicaDB can also replicate the result set of an arbitrary SQL query. Instead of using the --source-table
, --source-columns
and --source-where
arguments, you can specify a SQL statement with the --source-query
argument.
For example:
$ replicadb --source-query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id)'
4. Notes for specific connectors
- 4.1 CSV files Connector
- 4.2 Oracle Connector
- 4.3 PostgreSQL Connector
- 4.4 Denodo Connector
4.1 CSV files Connector
The CSV File Connector uses the Apache Commons CSV library to read and write the CSV files.
To define a CSV file, set the source-connect
or sink-connect
parameter to file:/...
.
############################# Source Options #############################
# Windows Paths
source.connect=file:/C:/Users/osalvador/Downloads/file.csv
source.connect=file://C:\\Users\\osalvador\\Downloads\\file.csv
# Unix Paths
source.connect=file:///Users/osalvador/Downloads/file.csv
source.columns=id, name, born
source.connect.parameter.columns.types=integer, varchar, date
############################# Sink Options #############################
# Windows Paths
sink.connect=file:/C:/Users/osalvador/Downloads/file.csv
sink.connect=file://C:\\Users\\osalvador\\Downloads\\file.csv
# Unix Paths
sink.connect=file:///Users/osalvador/Downloads/file.csv
By default the format of the CSV File is the DEFAULT
predefined format:
delimiter=,
quote="
recordSeparator=\r\n
ignoreEmptyLines=true
Note: The format is taken as the base format, you can modify any of its attributes by setting the rest parameters.
4.1.1 CSV File as Source
When defining a CSV file as a Source, you should note that the columns.types
parameter is required.
This parameter defines the format of the columns in the CSV file. This should include a comma-delimited list of columns data types and the exact number of columns in the CSV file.
For example, you set the parameter with source.connect.parameter.columns.types=integer,varchar,date
for a CSV File with 3 columns.
4.1.2 Supported Data Types for CSV file as Source
You can read all columns in the CSV file as VARCHAR
and ReplicaDB will store them in a String
. But, if you want to make a standard parsing of your data, you should define the correct data type of your column.
CSV supported data Types Mapped to Java Types
CSV Data Type | Java Type | Parser |
---|---|---|
VARCHAR |
String |
|
CHAR |
String |
|
LONGVARCHAR |
String |
|
INTEGER |
int |
Integer.parseInt() |
BIGINT |
long |
Long.parseLong() |
TINYINT |
byte |
Byte.parseByte() |
SMALLINT |
short |
Short.parseShort() |
NUMERIC |
java.math.BigDecimal |
new BigDecimal() |
DECIMAL |
java.math.BigDecimal |
new BigDecimal() |
DOUBLE |
double |
Double.parseDouble() |
FLOAT |
float |
Float.parseFloat() |
DATE |
java.sql.Date |
Date.valueOf() |
TIMESTAMP |
java.sql.Timestamp |
Timestamp.valueOf() |
TIME |
java.sql.Time |
Time.valueOf() |
BOOLEAN |
boolean |
Custom Boolean Parser |
Custom Boolean Parser
The boolean returned represents the value true
if the string argument is not null and is equal, ignoring case, to the string true
, yes
, on
, 1
, t
, y
.
4.1.3 Predefined CSV Formats
Thanks to Apache Commons CSV you can read or write CSV files in several predefined formats or customize yours with the available parameters.
You can use these predefined formats to read or write CSV files. To define a CSV format, set the format
extra parameter to any of these available formats: DEFAULT
, EXCEL
, INFORMIX_UNLOAD
, INFORMIX_UNLOAD_CSV
, MONGO_CSV
, MONGO_TSV
, MYSQL
, ORACLE
, POSTGRESQL_CSV
, POSTGRESQL_TEXT
, RFC4180
, TDF
.
Example
source.connect.parameter.format=RFC4180
DEFAULT
Standard Comma Separated Value format, as for RFC4180 but allowing empty lines.
Settings are:
delimiter=,
quote="
recordSeparator=\r\n
ignoreEmptyLines=true
EXCEL
The Microsoft Excel CSV format.
Excel file format (using a comma as the value delimiter). Note that the actual value delimiter used by Excel is locale dependent, it might be necessary to customize this format to accommodate to your regional settings.
Settings are:
delimiter=,
quote="
recordSeparator=\r\n
ignoreEmptyLines=false
INFORMIX_UNLOAD
Informix UNLOAD format used by the UNLOAD TO file_name
operation.
This is a comma-delimited format with a LF character as the line separator. Values are not quoted and special characters are escaped with \
.
Settings are:
delimiter=,
escape=\\
quote="
recordSeparator=\n
INFORMIX_UNLOAD_CSV
Informix CSV UNLOAD format used by the UNLOAD TO file_name
operation (escaping is disabled.)
This is a comma-delimited format with a LF character as the line separator. Values are not quoted and special characters are escaped with \
.
Settings are:
delimiter=,
quote="
recordSeparator=\n
MONGO_CSV
MongoDB CSV format used by the mongoexport
operation.
This is a comma-delimited format. Values are double quoted only if needed and special characters are escaped with "
. A header line with field names is expected.
Settings are:
delimiter=,
escape="
quote="
quoteMode=ALL_NON_NULL
MONGODB_TSV
Default MongoDB TSV format used by the mongoexport
operation.
This is a tab-delimited format. Values are double quoted only if needed and special characters are escaped with "
. A header line with field names is expected.
Settings are:
delimiter=\t
escape="
quote="
quoteMode=ALL_NON_NULL
MYSQL
Default MySQL format used by the SELECT INTO OUTFILE
and LOAD DATA INFILE
operations.
This is a tab-delimited format with a LF character as the line separator. Values are not quoted and special characters are escaped with \
. The default NULL string is \N
.
Settings are:
delimiter=\t
escape=\\
ignoreEmptyLines=false
quote=null
recordSeparator=\n
nullString=\N
quoteMode=ALL_NON_NULL
ORACLE
Default Oracle format used by the SQL*Loader utility.
This is a comma-delimited format with the system line separator character as the record separator. Values are double quoted when needed and special characters are escaped with "
. The default NULL string is ""
. Values are trimmed.
Settings are:
delimiter=,
escape=\\
ignoreEmptyLines=false
quote="
nullString=\N
trim=true
quoteMode=MINIMAL
POSTGRESQL_CSV
Default PostgreSQL CSV format used by the COPY
operation.
This is a comma-delimited format with a LF character as the line separator. Values are double quoted and special characters are escaped with "
. The default NULL string is ""
.
Settings are:
delimiter=,
escape="
ignoreEmptyLines=false
quote="
recordSeparator=\n
nullString=""
quoteMode=ALL_NON_NULL
POSTGRESQL_TEXT
Default PostgreSQL text format used by the COPY
operation.
This is a tab-delimited format with a LF character as the line separator. Values are double quoted and special characters are escaped with "
. The default NULL string is \\N
.
Settings are:
delimiter=\t
escape=\\
ignoreEmptyLines=false
quote="
recordSeparator=\n
nullString=\\N
quoteMode=ALL_NON_NULL
RFC4180
The RFC-4180 format defined by RFC-4180.
Settings are:
delimiter=,
quote="
recordSeparator=\r\n
ignoreEmptyLines=false
TDF
A tab delimited format.
Settings are:
delimiter=\t
quote="
recordSeparator=\r\n
ignoreSurroundingSpaces=true
4.1.4 Predefined Quote Modes
You can set a predefined quote mode policy when writing a CSV File as a sink. To define a quote mode, set the format.quoteMode
extra parameter to any of these available formats: ALL
, ALL_NON_NULL
, MINIMAL
, NON_NUMERIC
, NONE
.
Name | Description |
---|---|
ALL |
Quotes all fields. |
ALL_NON_NULL |
Quotes all non-null fields. |
MINIMAL |
Quotes fields which contain special characters such as the field delimiter, quote character, or any of the characters in the line separator string. |
NON_NUMERIC |
Quotes all non-numeric fields. |
NONE |
Never quote fields. |
4.1.5 Extra parameters
The CSV connector supports the following extra parameters that can only be defined as extra connection parameters in the options-file
:
Parameter | Description | Default |
---|---|---|
columns.types |
Sets the columns data types. This parameter is required for Source CSV Files | |
format |
Sets the base predefined CSV format | DEFAULT |
format.delimiter |
Sets the field delimiter character | , |
format.escape |
Sets the escape character | |
format.quote |
Sets the quoteChar character | " |
format.recordSeparator |
Sets the end-of-line character. This parameter only take effect on Snik CSV Files | |
format.firstRecordAsHeader |
Sets whether the first line of the file should be the header, with the names of the fields | false |
format.ignoreEmptyLines |
Sets whether empty lines between records are ignored on Source CSV Files | true |
format.nullString |
Sets the nullString character | |
format.ignoreSurroundingSpaces |
Sets whether spaces around values are ignored on Source CSV files | |
format.quoteMode |
Sets the quote policy on Sink CSV files | |
format.trim |
Sets whether to trim leading and trailing blanks |
Complete Example for CSV File as Source and Sink
############################# ReplicadB Basics #############################
mode=complete
jobs=1
############################# Soruce Options #############################
source.connect=file:///Users/osalvador/Downloads/fileSource.txt
source.connect.parameter.columns.types=integer, integer, varchar, time, float, boolean
source.connect.parameter.format=DEFAULT
source.connect.parameter.format.delimiter=|
source.connect.parameter.format.escape=\\
source.connect.parameter.format.quote="
source.connect.parameter.format.recordSeparator=\n
source.connect.parameter.format.firstRecordAsHeader=true
source.connect.parameter.format.ignoreEmptyLines=true
source.connect.parameter.format.nullString=
source.connect.parameter.format.ignoreSurroundingSpaces=true
source.connect.parameter.format.trim=true
############################# Sink Options #############################
sink.connect=file:///Users/osalvador/Downloads/fileSink.csv
sink.connect.parameter.format=RFC4180
sink.connect.parameter.format.delimiter=,
sink.connect.parameter.format.escape=\\
sink.connect.parameter.format.quote="
sink.connect.parameter.format.recordSeparator=\n
sink.connect.parameter.format.nullString=
sink.connect.parameter.format.quoteMode=non_numeric
############################# Other #############################
verbose=true
4.1.6 Replication Mode
Unlike in a database, the replication mode for a CSV file as sink has a slight difference:
- complete: Create a new file. If the file exists it is overwritten with the new data.
- incremental: Add the new data to the existing file. If the file does not exist, it creates it.
The
firstRecordAsHeader=true
parameter is not supported onincremental
mode.
Example
############################# ReplicadB Basics #############################
mode=complete
jobs=4
############################# Soruce Options #############################
source.connect=jdbc:oracle:thin:@host:port:sid
source.user=orauser
source.password=orapassword
source.table=schema.table_name
############################# Sink Options #############################
sink.connect=file:///Users/osalvador/Downloads/file.csv
4.2 Oracle Connector
To connect to an Oracle database, either as a source or sink, we must specify a Database URL string. In the Oracle documentation you can review all available options:Oracle Database URLs and Database Specifiers
Remember that in order to specify a connection using a TNSNames alias, you must set the oracle.net.tns_admin
property as indicated in the Oracle documentation.
Example
############################# ReplicadB Basics #############################
mode=complete
jobs=4
############################# Soruce Options #############################
source.connect=jdbc:oracle:thin:@MY_DATABASE_SID
source.user=orauser
source.password=orapassword
source.table=schema.table_name
source.connect.parameter.oracle.net.tns_admin=${TNS_ADMIN}
source.connect.parameter.oracle.net.networkCompression=on
############################# Sink Options #############################
...
4.3 PostgreSQL Connector
The PostgreSQL connector uses the SQL COPY command and its implementation in JDBC PostgreSQL COPY bulk data transfer what offers great performance.
The PostgreSQL JDBC driver has much better performance if the network is not included in the data transfer. Therefore, it is recommended that ReplicaDB be executed on the same machine where the PostgreSQL database resides.
In terms of monitoring and control, it is interesting to identify the connection to PostgreSQL setting the property ApplicationName
.
Example
source.connect=jdbc:postgresql://host:port/db
source.user=pguser
source.password=pgpassword
source.table=schema.table_name
source.connect.parameter.ApplicationName=ReplicaDB
4.4 Denodo Connector
The Denodo connector only applies as a source, since being a data virtualization, it is normally only used as a source.
To connect to Denodo, you can review the documentation for more information: Access Through JDBC
In terms of monitoring and control, it is interesting to identify the connection to PostgreSQL setting the property userAgent
.
Example
source.connect=jdbc:vdb://host:port/db
source.user=vdbuser
source.password=vdbpassword
source.table=schema.table_name
source.connect.parameter.userAgent=ReplicaDB
4.5 Amazon S3 Connector
Amazon Simple Storage Service (Amazon S3) provides secure, durable, highly-scalable object storage. For information about Amazon S3, see Amazon S3.
The s3 protocol is used in a URL that specifies the location of an Amazon S3 bucket and a prefix to use for writing files in the bucket.
S3 URI format:
s3://S3_endpoint[:port]/bucket_name/[bucket_subfolder]
Example:
sink.connect=s3://s3.eu-west-3.amazonaws.com/replicadb/images
Connecting to Amazon S3 requires AccessKey and SecretKey provided by your Amazon S3 account. These security keys are specified as additional parameters in the connection.
You can use the AWS S3 connector on any system compatible with their API, such as MinIO or other cloud providers such as Dreamhost, Wasabi, Dell EMC ECS Object Storage
4.5.1 Row Object Creation Type
There are two ways to create objects in Amazon S3 through the connector:
- Generate a single CSV file for all rows of a source table
- Generate a binary object for each row of the source table
The behavior is set through the sink.connect.parameter.row.isObject
property where it can be true
or false
.
The purpose of this feature when sink.connect.parameter.row.isObject = true
is to be able to extract or replicate LOBs (Large Objects) from sources to single objects in AWS S3. Where for each row of the table the content of the LOB field (BLOB, CLOB, JSON, XMLTYPE, any …) will be the payload of the object in AWS S3.
Similarly, when sink.connect.parameter.row.isObject = false
ReplicaDB will generate a single CSV file for all rows in the source table and upload it to AWS S3 in memory streaming, without intermediate files.
4.5.1.1 One Object Per Row
To generate an object for each row of the source table, it is necessary to set the following properties:
# Each row is a different object in s3
sink.connect.parameter.row.isObject=true
sink.connect.parameter.row.keyColumn=[The name of the source table column used as an object key in AWS S3]
sink.connect.parameter.row.contentColumn=[the name of the source table column used as a payload object of the object in AWS S3]
Example:
############################# ReplicadB Basics #############################
mode=complete
jobs=4
############################# Soruce Options #############################
source.connect=jdbc:oracle:thin:@host:port:sid
source.user=orauser
source.password=orapassword
source.table=product_image
source.columns=product_id || '.jpg' as key_column, image as content_column
############################# Sink Options #############################
sink.connect=s3://s3.eu-west-3.amazonaws.com/replicadb/images
sink.connect.parameter.accessKey=ASDFKLJHIOVNROIUNVSD
sink.connect.parameter.secretKey=naBMm7jVRyeE945m1jIIxMomoRM9rMCiEvVBtQe3
# Each row is a different object in s3
sink.connect.parameter.row.isObject=true
sink.connect.parameter.row.keyColumn=key_column
sink.connect.parameter.row.contentColumn=content_column
The following objects will be generated in AWS S3:
4.5.1.2 One CSV For All Rows
To generate a single CSV file for all rows of a source table, it is necessary to set the following properties:
# All rows are only one CSV object in s3
sink.connect.parameter.csv.keyFileName=[the full name of the target file or object key in AWS S3]
The CSV file generated is RFC 4180 compliant whenever you disable the default escape with --sink-disable-scape
as argument or on the options-file
:
sink.disable.escape=true
IMPORTANT: To support multi-threaded execution and since it is not possible to append content to an existing AWS S3 file, ReplicaDB will generate one file per job, renaming each file with the taskid.
Example:
############################# ReplicadB Basics #############################
mode=complete
jobs=4
############################# Soruce Options #############################
source.connect=jdbc:oracle:thin:@host:port:sid
source.user=orauser
source.password=orapassword
source.table=product_description
############################# Sink Options #############################
sink.connect=s3://s3.eu-west-3.amazonaws.com/replicadb/images
sink.connect.parameter.accessKey=ASDFKLJHIOVNROIUNVSD
sink.connect.parameter.secretKey=naBMm7jVRyeE945m1jIIxMomoRM9rMCiEvVBtQe3
# All rows are only one CSV object in s3
sink.connect.parameter.csv.keyFileName=product_description.csv
sink.disable.escape=true
The following objects will be generated in AWS S3:
4.5.2 Extra parameters
The Amazon S3 connector supports the following extra parameters that can only be defined as extra connection parameters in the options-file
:
Parameter | Description | Default |
---|---|---|
accessKey |
AWS S3 Access Key ID to access the S3 bucket | Required |
secretKey |
AWS S3 Secret Access Key for the S3 Access Key ID to access the S3 bucket | Required |
secure-connection |
Sets if the connection is secure using SSL (HTTPS) | true |
row.isObject |
Sets whether each row in the source table is a different object in AWS S3 | false |
row.keyColumn |
Sets the name of the column in the source table whose content will be used as objectKey (filename) in AWS S3 | Required when the row.isObject = true |
row.contentColumn |
Sets the name of the column in the source table whose content will be the payload of the object in AWS S3 | Required when the row.isObject = true |
csv.keyFileName |
Set the name of the target file or object key in AWS S3 | Required when the row.isObject = false |
csv.FieldSeparator |
Sets the field separator character | , |
csv.TextDelimiter |
Sets a field enclosing character | " |
csv.LineDelimiter |
Sets the end-of-line character | \n |
csv.AlwaysDelimitText |
Sets whether the text should always be delimited | false |
csv.Header |
Sets whether the first line of the file should be the header, with the names of the fields | false |
4.6 MySQL and MariaDB Connector
Because the MariaDB JDBC driver is compatible with MySQL, and the MariaDB driver has better performance compared to the MySQL JDBC driver, in ReplicaDB we use only the MariaDB driver for both databases.
This connector uses the SQL LOAD DATA INFILE
command and its implementation in JDBC JDBC API Implementation Notes what offers great performance.
ReplicaDB automatically sets these connection properties that are necessary to use the LOAD DATA INFILE
command:
characterEncoding=UTF-8
allowLoadLocalInfile=true
rewriteBatchedStatements=true
Example
source.connect=jdbc:mysql://host:port/db
source.user=pguser
source.password=pgpassword
source.table=schema.table_name
4.7 MSSQL Server Connector
ReplicaDB uses the MSSQL Server bulk copy API for load data into the database.
Hence, depending on the source database, some data types are not supported by the MSSQL Server bulk copy, and must be cast during the replication process to avoid the Specification of length or precision 0 is invalid
exception.
In addition, to avoid the exception Column xx is invalid. Please check your column mappings
it will be necessary to include all source and sink columns.
In this example, 4 columns from table t_source
in PostgresSQL are replicated to t_sink
in MSSQL Server. The c_numeric
and c_decimal
columns are converted to text
, the binary column is hex
encoded and the xml
type column is not supported by the bulk copy API, so it is converted to text
.
Example
...
############################# Soruce Options ##############################
source.connect=jdbc:postgresql://localhost:5432/postgres
source.user=root
source.password=ReplicaDB_1234
source.table=public.t_source
source.columns=c_numeric::text,\
c_decimal::text,\
encode(c_binary_lob, 'hex'),\
c_xml::text
############################# Sink Options ################################
sink.connect=jdbc:sqlserver://localhost:1433;database=master
sink.user=sa
sink.password=ReplicaDB_1234
sink.table=dbo.t_sink
sink.staging.schema=dbo
sink.columns=c_numeric, c_decimal, c_binary_blob, c_xml
4.8 SQLite Connector
In the SQLite connector the complete-atomic
mode is not supported because replicadb performs multiple transactions on the same table and rows and cannot control the transaction isolation level.
Example
...
######################## ReplicadB General Options ########################
mode=complete
jobs=1
############################# Soruce Options ##############################
source.connect=jdbc:postgresql://localhost:5432/postgres
source.user=sa
source.password=root
source.table=t_source
############################# Sink Options ################################
sink.connect=jdbc:sqlite:/tmp/replicadb-sqlite.db
sink.table=main.t_sink
sink.staging.schema=main
4.9 MongoDB Connector
The MongoDB connector in ReplicaDB allows you to replicate data between MongoDB databases and other databases supported by ReplicaDB. The MongoDB connector uses the MongoDB Bulk API to perform the replication.
Configuration To configure the MongoDB connector, you will need to specify the following options in your ReplicaDB configuration file.
Note that source.user
, source.password
, sink.user
and sink.password
are not compatible and must be defined in the URI connection string in the source.connect
and sink.connect
options.
The source.connect
and sink.connect
URI string follows the MongoDB URI format:
mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
- For MongoDB Atlas:
mongodb+srv://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
The database name in the URI connection string is required.
Source Options
Parameter | Description |
---|---|
source.connect |
The MongoDB connection string with username and password. Compatible with MongoDB Atlas mongodb+srv |
source.user |
Not compatible |
source.password |
Not compatible |
source.table |
The name of the collection in the source database to replicate from |
source.columns |
The list of columns to replicate from the source collection ($projection ). In JSON format |
source.where |
A query filter to apply to the source collection. in JSON format |
source.query |
An aggregation pipeline (aggregate ) to execute on the source collection. In Array JSON format |
Sink Options
Parameter | Description |
---|---|
sink.connect |
The MongoDB connection string with username and password. Compatible with MongoDB Atlas mongodb+srv |
sink.user |
Not compatible |
sink.password |
Not compatible |
sink.table |
The name of the collection in the sink database to replicate to |
Incremental Replication
The MongoDB connector supports incremental replication using the source.query
or source.where
options. To enable incremental replication, you will need to specify a source.query
aggregation pipeline that includes a $match
stage to filter the documents to be replicated or a source.where
query filter.
The incremental mode for MongoDB as sink database uses the $merge
statement to update the documents in the sink collection. The $merge
statement requires a unique index on some field in the sink collection. If the sink collection does not have a unique index, ReplicaDB will throw an exception.
Note that the $merge
stage is supported since MongoDB 4.2 and later.
For example, to replicate all documents in the source_collection
with a timestamp field greater than or equal to the current time, you could use the following configuration:
Using source.query
mode=incremental
source.query=[{$match: {timestamp: {$gte: new Date()}}},{$sort: {timestamp: 1}}]
source.table=source_collection
sink.table=sink_collection
Using source.where
mode=incremental
source.where={timestamp: {$gte: new Date()}}
source.table=source_collection
sink.table=sink_collection
Examples
1.Replicate all documents from a collection in local MongoDB instance to a collection in remote MongoDB Atlas instance:
######################## ReplicadB General Options ########################
mode=complete
jobs=1
fetch.size=100
############################# Source Options ##############################
source.connect=mongodb://root:password@127.0.0.1/catalog
source.table=products
############################# Sink Options ################################
sink.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog?retryWrites=true&w=majority
sink.table=products
2.Replicate a subset of documents from a collection in a local mongodb instance to a collection in a remote mongodb instance, using the source.query
option to specify a mongodb aggregation pipeline:
######################## ReplicadB General Options ########################
mode=complete
jobs=1
fetch.size=100
############################# Source Options ##############################
source.connect=mongodb://root:password@127.0.0.1/catalog
source.table=products
source.query=[{ $match : {startDate:{$gte:ISODate("2021-01-01T00:00:00.000Z")}} } \
,{$sort:{startDate:1}} \
,{$project: {_id:0, shopId:1, productId:1, price:1, startDate:1} } ]
############################# Sink Options ################################
sink.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog?retryWrites=true&w=majority
sink.table=products
3.Replicate a table from a local Postgres database to a collection in a remote MongoDB Atlas instance:
######################## ReplicadB General Options ########################
mode=complete
jobs=1
fetch.size=100
verbose=true
############################# Source Options ##############################
source.connect=jdbc:postgresql://localhost:5432/postgres
source.user=user
source.password=password
source.table=t_roles
# Rename source columns to match mongodb collection sink fields
source.columns=id as "rolId", txt_code as code, txt_role_name as "roleName" \
, cod_v_usuario_creacion as "createdBy", fec_dt_creacion as "createdAt" \
, cod_v_usuario_modificacion as "updatedBy", fec_dt_modificacion as "updatedAt"
############################# Sink Options ################################
sink.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog?retryWrites=true&w=majority
sink.table=roles
4.Replicate a collection from a remote MongoDB Atlas instance to a local Postgres database.
It’s mandatory to specify the source.columns
projection and sink.columns
options to match the source collection fields to the sink table columns.
######################## ReplicaDB General Options ########################
mode=complete
jobs=1
fetch.size=100
verbose=true
############################# Source Options ##############################
source.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog
source.table=roles
source.columns={_id:0,rolId:1,roleName:1,createdBy:1,createdAt:1,updatedBy:1,updatedAt:1}
source.where={createdAt:{$gte:ISODate("2021-01-01T00:00:00.000Z")}}
############################# Sink Options ################################
sink.connect=jdbc:postgresql://localhost:5432/postgres
sink.user=user
sink.password=password
sink.table=t_roles
sink.columns=id, txt_role_name, cod_v_usuario_creacion, fec_dt_creacion, cod_v_usuario_modificacion, fec_dt_modificacion
5.Replicate a collection from a remote MongoDB Atlas instance to a local Postgres database using the Postgres jsonb
data type and projection some fields from the source collection.
It’s mandatory to specify the source.columns
projection and sink.columns
options to match the source collection fields to the sink table columns.
######################## ReplicaDB General Options ########################
mode=complete
jobs=1
fetch.size=100
verbose=true
############################# Source Options ##############################
source.connect=mongodb+srv://user:password@cluster0.qwertyh.mongodb.net/catalog
source.table=products
source.columns={_id:0,sku:1, document:'$$ROOT'}
############################# Sink Options ################################
sink.connect=jdbc:postgresql://localhost:5432/postgres
sink.user=user
sink.password=password
sink.table=products_jsonb
sink.columns=sku, document
You can also replicate from Postgres jsonb to MongoDB using the jsonb
data type. ReplicaDB will automatically convert the jsonb data type to a MongoDB document.