为jdbc sink连接器提取和转换kafka消息的特定字段。

我有一个kafka主题,使用Debezium mysql源连接器从mysql数据库获取数据,以下是其中一个消息的格式。

{
    "Message": {
        "schema": {
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        },
        "payload": {
            "op": "u",
            "ts_ms": 1465491411815,
            "before": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "annek@noanswer.org"
            },
            "after": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "annek@noanswer.org"
            },
            "source": {
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            }
        }
    }
}

我想推送 ts_ms, before, afterid (从objectrow)列到另一个数据库,使用jdbc sink连接器,表的模式是 (id,before(text),after(text),timestamp)我是一个新的kafka新手,不知道该怎么做。

  • 我怎样才能从消息中只提取这些字段来推送而忽略其他字段?

  • 我怎么能把之前,之后的字段转换成字符串序列化格式?

  • 我怎么能提取 id 从对象? (如果是插入操作,前面为空,如果是删除,后面为空)

对于上面的消息,sink目的表的最后应该有下面这样的数据。

id:     1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after:  '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815

解决方案:

你可以使用连锁的 Kafka连接转换,像这样 解决办法.

本文来自投稿,不代表运维实战侠立场,如若转载,请注明出处:https://www.shizhanxia.com/524.html

(0)
上一篇 2022年6月29日 下午3:57
下一篇 2022年6月29日 下午3:57

相关推荐

发表评论

登录后才能评论