DataX 实践(二)从 PostgreSQL 读数据库全量同步表到 PostgreSQL 写数据库

在开展 PostgreSQL 增量同步到 MongoDB 的工作之前,先尝试个简单点的全量同步,也能熟悉一下 DataX 的各个配置项和其作用。


附下 DataX 的工作原理:
DataX 的工作原理
官方文档中也给了一个 MySQL 同步到 HDFS 的例子,和上面的流程图一起看会更容易理解: datax_framework_new
DataX 本身作为离线数据同步框架,采用 Framework+plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。

  • Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 Framework。
  • Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。
  • Framework:Framework 用于连接 reader 和 writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

1、基础的配置文件
要实现不同的源数据库读取和目标数据库写入功能,只需要在配置文件中装载不同插件即可。
插件列表:Support Data Channels
这里以 PostgreSQL 读写为例,只要注意一下源列和目标列是靠索引对齐的,字段名可以不一样就行了:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {
                "record": 2,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader",
                    "parameter": {
                        "username": "{from_username}",
                        "password": "{from_password}",
                        "where": "",
                        "column": [
                            "{from_column_01}",
                            "{from_column_02}"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "{from_table}"
                                ],
                                "jdbcUrl": [
                                    "jdbc:postgresql://{from_host}:{from_port}/{from_database}"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "postgresqlwriter",
                    "parameter": {
                        "username": "{to_username}",
                        "password": "{to_password}",
                        "column": [
                            "{from_column_01}",
                            "{from_column_02}"
                        ],
                        "preSql": [
                            "DELETE FROM {to_table};"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:postgresql://{to_host}:{to_port}/{to_database}",
                                "table": [
                                    "{to_table}"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

除开 Python3 能使用 .format() 方法直接替换的参数外,完整的参数介绍可以参照以下的官方文档。
PostgreSQL 读插件:PostgreSQLReader
PostgreSQL 写插件:PostgreSQLWriter

2、我的需求和解决
我的需求很简单:定时同步整表数据,不需要什么搜索条件。
但是在我实际进行同步的时候,碰到了这个问题:

Exception in thread "taskGroup-0" com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-06], Description:[执行数据库 Sql 失败, 请检查您的配置的 column/table/where/querySql或者向 DBA 寻求帮助.].  - 执行的SQL为: select id,name,price,detail from test_table  具体错误信息为:com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-12], Description:[不支持的数据库类型. 请注意查看 DataX 已经支持的数据库类型以及数据库版本.].  - 您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[detail], 字段名称:[1111], 字段Java类型:[java.lang.Object]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .

检查了一下,detail 字段是 JSONB 类型的,无论是读还是写插件文档中都没有标注支持该类型,考虑了一下可能只有 4 个选择了:

  1. 更改数据库字段类型
  2. 在读的过程中使用函数将 JSONB 类型改为字符串,写的时候依靠 PosrgreSQL 自动将字符串类型转为 JSONB 存入数据库
  3. 在读的过程中使用函数将 JSONB 类型改为字符串,写入临时列,等同步结束后再执行语句将临时列数据转为 JSONB 存回原列
  4. 摆烂,不同步这个字段

JSONB 存存无用信息很方便,且当前已经有很多基于 JSONB 特性写的 SQL 语句了,排除了 1。
在多次尝试之后发现无法实现,排除掉了 2。

关于 2 的尝试,读的过程中类型转换是很顺利的,写入之前值已经是类似 '{"111": 222}' 的格式了,但是到了正式写入:如果你不强制指定类型,那么会报 PostgreSQLWriter 插件不支持 JSON 类型写入的错误,猜测是读取了数据库中的列类型而并非直接读取值的类型;强制指定类型 "detail::VARCHAR" 之后又会报 PostgreSQL 插入错误,无法将 VARCHAR 类型的数据插入 JSONB 类型的列。

最终只剩方案 3,庆幸在表设计之初就准备了备用字段,现在还有 2 个能用,并且后续如果 DataX 插件支持 JSONB 类型,只需要改配置文件就行了,很方便。
修改后的配置文件(加上了我自己表的结构信息):

{
    "job": {
        "setting": {
            "speed": {
                "channel": 10
            },
            "errorLimit": {
                "record": 2,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader",
                    "parameter": {
                        "username": "{from_username}",
                        "password": "{from_password}",
                        "where": "",
                        "column": [
                            "id",
                            "name",
                            "price",
                            "detail:VARCHAR:"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "test_table"
                                ],
                                "jdbcUrl": [
                                    "jdbc:postgresql://{from_host}:{from_port}/{from_database}"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "postgresqlwriter",
                    "parameter": {
                        "username": "{to_username}",
                        "password": "{to_password}",
                        "column": [
                            "id",
                            "name",
                            "price",
                            "spare"
                        ],
                        "preSql": [
                            "DELETE FROM test_table;"
                        ],
                        "postSql": [
                            "UPDATE test_table SET detail = spare::JSONB;",
                            "UPDATE test_table SET spare = NULL;"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:postgresql://{to_host}:{to_port}/{to_database}",
                                "table": [
                                    "test_table"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

会慢一点,适当调高 channel 并发之后还能接受,等数据量上来了再补性能调优。

本章结束。