阿里巴巴Canal Client同步ElasticSearch
参考:https://github.com/alibaba/canal/wiki/Sync-ES
背景
canal 1.1.1版本之后,内置增加客户端数据同步功能, Client适配器整体介绍:ClientAdapter
canal adapter
的 Elastic Search 版本支持6.x.x
以上, 如需其它版本的es可替换依赖重新编译client-adapter.elasticsearch
模块
ElasticSearch适配器
修改启动器配置: application.yml
canal.conf:
canalServerHost: 127.0.0.1:11111
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
mode: tcp
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
username: root
password: 121212
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
-
key: exampleKey
name: es6 # or es7
hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔
properties:
mode: transport # or rest # 可指定transport模式或者rest模式
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch # es cluster name
adapter将会自动加载conf/es
下的所有.yml
结尾的配置文件
适配器表映射文件
修改conf/es/mytest_user.yml
文件:
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: exampleKey # 对应application.yml中es配置的key
destination: example # cannal的instance或者MQ的topic
groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: mytest_user # es 的索引名称
_type: _doc # es 的type名称, es7下无需配置此项
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
# pk: id # 如果不需要_id, 则需要指定一个属性为主键属性
# sql映射
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time, c.labels as _labels from user a
left join role b on b.id=a.role_id
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id"
# objFields:
# _labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
# _obj: object # json对象
etlCondition: "where a.c_time>='{0}'" # etl 的条件参数
commitBatch: 3000 # 提交批大小
sql映射说明
sql支持多表关联自由组合,但是有一定的限制:
- 主表不能为子查询语句
- 只能使用
left outer join
,即最左表一定要是主表 - 关联从表如果是子查询不能有多张表
- 主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐,可能会造成数据同步的不一致,比如修改了where条件中的字段内容)
- 关联条件只允许主外键的
=
操作不能出现其他常量判断比如:on a.role_id=b.id and b.statues=1
- 关联条件必须要有一个字段出现在主查询语句中比如:
on a.role_id=b.id
,其中的a.role_id
或者b.id
必须出现在主select语句中
ElasticSearch的mapping 属性与sql的查询值将一一对应(不支持select *
), 比如:select a.id as _id, a.name, a.email as _email from user
,其中name
将映射到es mapping的name field
,_email
将 映射到mapping的_email field
,这里以别名(如果有别名)作为最终的映射字段。这里的_id
可以填写到配置文件的_id: _id
映射。
单表映射索引示例sql
select a.id as _id, a.name, a.role_id, a.c_time from user a
该sql对应的es mapping示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text"
},
"role_id": {
"type": "long"
},
"c_time": {
"type": "date"
}
}
}
}
}
}
单表映射索引示例sql带函数或运算操作
select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a
函数字段后必须跟上别名,该sql对应的es mapping示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text"
},
"role_id": {
"type": "long"
},
"c_time": {
"type": "date"
}
}
}
}
}
}
多表映射(一对一,多对一)索引示例sql
select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a
left join role b on b.id = a.role_id
注:这里join操作只能是
left outer join
,第一张表必须为主表!!
该sql对应的es mapping示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text"
},
"role_id": {
"type": "long"
},
"role_name": {
"type": "text"
},
"c_time": {
"type": "date"
}
}
}
}
}
}
多表映射(一对多)索引示例sql
select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id
注:
left join
后的子查询只允许一张表,即子查询中不能再包含子查询或者关联!!
该sql对应的es mapping示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text"
},
"role_id": {
"type": "long"
},
"c_time": {
"type": "date"
},
"labels": {
"type": "text"
}
}
}
}
}
}
其它类型的sql示例
(1)geo type
select ... concat(IFNULL(a.latitude, 0), ',', IFNULL(a.longitude, 0)) AS location, ...
(2)复合主键
select concat(a.id,'_',b.type) as _id, ... from user a left join role b on b.id=a.role_id
(3)数组字段
select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id
配置中使用:
objFields:
labels: array:;
(4)对象字段
select a.id as _id, a.name, a.role_id, c.labels, a.c_time, a.description from user a
配置中使用:
objFields:
description: object
其中a.description
字段内容为json字符串
(5)父子文档索引
es/customer.yml
esMapping:
_index: customer
_type: _doc
_id: id
relations:
customer_order:
name: customer
sql: "select t.id, t.name, t.email from customer t"
es/order.yml
esMapping:
_index: customer
_type: _doc
_id: _id
relations:
customer_order:
name: order
parent: customer_id
sql: "select concat('oid_', t.id) as _id,
t.customer_id,
t.id as order_id,
t.serial_code as order_serial,
t.c_time as order_time
from biz_order t"
skips:
- customer_id
mapping示例:
{
"mappings":{
"_doc":{
"properties":{
"id": {
"type": "long"
},
"name": {
"type": "text"
},
"email": {
"type": "text"
},
"order_id": {
"type": "long"
},
"order_serial": {
"type": "text"
},
"order_time": {
"type": "date"
},
"customer_order":{
"type":"join",
"relations":{
"customer":"order"
}
}
}
}
}
}
启动ES数据同步
启动canal-adapter启动器
bin/startup.sh
验证
- 新增mysql
mytest.user
表的数据,将会自动同步到es的mytest_user
索引下面, 并会打出DML的log - 修改mysql
mytest.role
表的role_name,将会自动同步es的mytest_user
索引中的role_name数据 - 新增或者修改mysql
mytest.label
表的label,将会自动同步es的mytest_user
索引中的labels数据
版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/03/25/alibaba-canal-client-synchronize-elasticsearch/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。
共有 0 条评论