为jdbc sink连接器提取和转换kafka消息的特定字段。

我有一个kafka主题,使用Debezium mysql源连接器从mysql数据库获取数据,以下是其中一个消息的格式。

{
    "Message": {
        "schema": {
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        },
        "payload": {
            "op": "u",
            "ts_ms": 1465491411815,
            "before": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "annek@noanswer.org"
            },
            "after": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "annek@noanswer.org"
            },
            "source": {
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            }
        }
    }
}

我想推送 ts_ms, before, afterid (从objectrow)列到另一个数据库,使用jdbc sink连接器,表的模式是 (id,before(text),after(text),timestamp)我是一个新的kafka新手,不知道该怎么做。

  • 我怎样才能从消息中只提取这些字段来推送而忽略其他字段?

  • 我怎么能把之前,之后的字段转换成字符串序列化格式?

  • 我怎么能提取 id 从对象? (如果是插入操作,前面为空,如果是删除,后面为空)

对于上面的消息,sink目的表的最后应该有下面这样的数据。

id:     1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after:  '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815

解决方案:

你可以使用连锁的 Kafka连接转换,像这样 解决办法.

给TA打赏
共{{data.count}}人
人已打赏
解决方案

在序列化时对SnakeYaml应用格式化。

2022-4-22 16:09:02

解决方案

在Python类中使用for循环更新列表

2022-4-22 16:09:04

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索