我有一个kafka主题,使用Debezium mysql源连接器从mysql数据库获取数据,以下是其中一个消息的格式。
{
"Message": {
"schema": {
"type": "struct",
"fields": [
...
],
"optional": true,
"name": "mysql-server-1.inventory.somename"
},
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Doof",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Marry",
"email": "annek@noanswer.org"
},
"source": {
"db": "inventory",
"table": "customers",
...
"query": "Update customers set last_name = 'Marry' where id = 1004"
}
}
}
}
我想推送 ts_ms, before, after
和 id
(从objectrow)列到另一个数据库,使用jdbc sink连接器,表的模式是 (id,before(text),after(text),timestamp)
我是一个新的kafka新手,不知道该怎么做。
-
我怎样才能从消息中只提取这些字段来推送而忽略其他字段?
-
我怎么能把之前,之后的字段转换成字符串序列化格式?
-
我怎么能提取
id
从对象? (如果是插入操作,前面为空,如果是删除,后面为空)
对于上面的消息,sink目的表的最后应该有下面这样的数据。
id: 1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after: '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815
解决方案:
本文来自投稿,不代表运维实战侠立场,如若转载,请注明出处:https://www.shizhanxia.com/524.html