DataX 实践(三)从 PostgreSQL 读数据库增量同步表到 MongoDB 写数据库

对价格字段进行增量同步。


简单阐述全量同步和增量同步区别:
全量同步:就是每天定时(避开业务高峰期)或者周期性全量把数据从一个地方拷贝到另外一个地方,可以采用直接全部覆盖(使用新数据覆盖旧数据)或者走更新逻辑(覆盖前判断下,如果新旧不一致,就更新)。
增量同步:就是指抓取某个时刻(更新时间)或者检查点以后的数据来同步,不是无规律的全量同步。


1、基础的配置文件
MongoDB 写插件的官方文档:Datax MongoDBWriter

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {}
        },
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader",
                    "parameter": {
                        "username": "{from_username}",
                        "password": "{from_password}",
                        "where": "",
                        "column": [
                            "{from_column_01}",
                            "{from_column_02}"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "{from_table}"
                                ],
                                "jdbcUrl": [
                                    "jdbc:postgresql://{from_host}:{from_port}/{from_database}"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mongodbwriter",
                    "parameter": {
                        "address": [
                            "{to_host}:{to_port}"
                        ],
                        "userName": "{to_username}",
                        "userPassword": "{to_password}",
                        "dbName": "{to_database}",
                        "collectionName": "smr_{web}_{mode}_history",
                        "column": [
                            {
                                "name": "{from_column_01}",
                                "type": "string"
                            },
                            {
                                "name": "{from_column_02}",
                                "type": "string"
                            }
                        ]
                    }
                }
            }
        ]
    }
}

2、我的需求和解决
我的需求是:定时将 PostgreSQL 表中的价格字段和更新时间字段,以更新时间的时间戳作为主键,保存到 MongoDB 中。
写起来还是很简单的,只要注意一下 PostgreSQL 时间戳的转换就行了。
最后的配置文件:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 10
            },
            "errorLimit": {
                "record": 2,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader",
                    "parameter": {
                        "username": "{from_username}",
                        "password": "{from_password}",
                        "where": "",
                        "column": [
                            "update_time",
                            "price"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "test_table"
                                ],
                                "jdbcUrl": [
                                    "jdbc:postgresql://{from_host}:{from_port}/{from_database}"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mongodbwriter",
                    "parameter": {
                        "address": [
                            "{to_host}:{to_port}"
                        ],
                        "userName": "{to_username}",
                        "userPassword": "{to_password}",
                        "dbName": "{to_database}",
                        "collectionName": "test_table_price_history",
                        "column": [
                            {
                                "name": "time",
                                "type": "int"
                            },
                            {
                                "name": "price",
                                "type": "double"
                            }
                        ],
                        "upsertInfo": {
                            "isUpsert": "false"
                        }
                    }
                }
            }
        ]
    }
}

结束。