在开展 PostgreSQL 增量同步到 MongoDB 的工作之前,先尝试个简单点的全量同步,也能熟悉一下 DataX 的各个配置项和其作用。
附下 DataX 的工作原理:
官方文档中也给了一个 MySQL 同步到 HDFS 的例子,和上面的流程图一起看会更容易理解:
DataX 本身作为离线数据同步框架,采用 Framework+plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。
- Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 Framework。
- Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。
- Framework:Framework 用于连接 reader 和 writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
1、基础的配置文件
要实现不同的源数据库读取和目标数据库写入功能,只需要在配置文件中装载不同插件即可。
插件列表:Support Data Channels
这里以 PostgreSQL 读写为例,只要注意一下源列和目标列是靠索引对齐的,字段名可以不一样就行了:
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 2,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "{from_username}",
"password": "{from_password}",
"where": "",
"column": [
"{from_column_01}",
"{from_column_02}"
],
"connection": [
{
"table": [
"{from_table}"
],
"jdbcUrl": [
"jdbc:postgresql://{from_host}:{from_port}/{from_database}"
]
}
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "{to_username}",
"password": "{to_password}",
"column": [
"{from_column_01}",
"{from_column_02}"
],
"preSql": [
"DELETE FROM {to_table};"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://{to_host}:{to_port}/{to_database}",
"table": [
"{to_table}"
]
}
]
}
}
}
]
}
}
除开 Python3 能使用 .format()
方法直接替换的参数外,完整的参数介绍可以参照以下的官方文档。
PostgreSQL 读插件:PostgreSQLReader
PostgreSQL 写插件:PostgreSQLWriter
2、我的需求和解决
我的需求很简单:定时同步整表数据,不需要什么搜索条件。
但是在我实际进行同步的时候,碰到了这个问题:
Exception in thread "taskGroup-0" com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-06], Description:[执行数据库 Sql 失败, 请检查您的配置的 column/table/where/querySql或者向 DBA 寻求帮助.]. - 执行的SQL为: select id,name,price,detail from test_table 具体错误信息为:com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-12], Description:[不支持的数据库类型. 请注意查看 DataX 已经支持的数据库类型以及数据库版本.]. - 您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[detail], 字段名称:[1111], 字段Java类型:[java.lang.Object]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .
检查了一下,detail
字段是 JSONB 类型的,无论是读还是写插件文档中都没有标注支持该类型,考虑了一下可能只有 4 个选择了:
- 更改数据库字段类型
- 在读的过程中使用函数将 JSONB 类型改为字符串,写的时候依靠 PosrgreSQL 自动将字符串类型转为 JSONB 存入数据库
- 在读的过程中使用函数将 JSONB 类型改为字符串,写入临时列,等同步结束后再执行语句将临时列数据转为 JSONB 存回原列
摆烂,不同步这个字段
JSONB 存存无用信息很方便,且当前已经有很多基于 JSONB 特性写的 SQL 语句了,排除了 1。
在多次尝试之后发现无法实现,排除掉了 2。
关于 2 的尝试,读的过程中类型转换是很顺利的,写入之前值已经是类似
'{"111": 222}'
的格式了,但是到了正式写入:如果你不强制指定类型,那么会报 PostgreSQLWriter 插件不支持 JSON 类型写入的错误,猜测是读取了数据库中的列类型而并非直接读取值的类型;强制指定类型"detail::VARCHAR"
之后又会报 PostgreSQL 插入错误,无法将 VARCHAR 类型的数据插入 JSONB 类型的列。
最终只剩方案 3,庆幸在表设计之初就准备了备用字段,现在还有 2 个能用,并且后续如果 DataX 插件支持 JSONB 类型,只需要改配置文件就行了,很方便。
修改后的配置文件(加上了我自己表的结构信息):
{
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 2,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "{from_username}",
"password": "{from_password}",
"where": "",
"column": [
"id",
"name",
"price",
"detail:VARCHAR:"
],
"connection": [
{
"table": [
"test_table"
],
"jdbcUrl": [
"jdbc:postgresql://{from_host}:{from_port}/{from_database}"
]
}
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "{to_username}",
"password": "{to_password}",
"column": [
"id",
"name",
"price",
"spare"
],
"preSql": [
"DELETE FROM test_table;"
],
"postSql": [
"UPDATE test_table SET detail = spare::JSONB;",
"UPDATE test_table SET spare = NULL;"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://{to_host}:{to_port}/{to_database}",
"table": [
"test_table"
]
}
]
}
}
}
]
}
}
会慢一点,适当调高 channel
并发之后还能接受,等数据量上来了再补性能调优。
本章结束。