新聞中心
場景
在微服務拆分的架構(gòu)中,各服務擁有自己的數(shù)據(jù)庫,所以常常會遇到服務之間數(shù)據(jù)通信的問題。比如,B服務數(shù)據(jù)庫的數(shù)據(jù)來源于A服務的數(shù)據(jù)庫;A服務的數(shù)據(jù)有變更操作時,需要同步到B服務中。

解決方案
1、 在代碼邏輯中,有相關A服務數(shù)據(jù)寫操作時,以調(diào)用接口的方式,調(diào)用B服務接口,B服務再將數(shù)據(jù)寫到新的數(shù)據(jù)庫中。這種方式看似簡單,但其實“坑”很多。在A服務代碼邏輯中會增加大量這種調(diào)用接口同步的代碼,增加了項目代碼的復雜度,以后會越來越難維護。并且,接口調(diào)用的方式并不是一個穩(wěn)定的方式,沒有重試機制,沒有同步位置記錄,接口調(diào)用失敗了怎么處理,突然的大量接口調(diào)用會產(chǎn)生的問題等,這些都要考慮并且在業(yè)務中處理。這里會有不少工作量。想到這里,就將這個方案排除了。
2、通過數(shù)據(jù)庫的binlog進行同步。這種解決方案,與A服務是獨立的,不會和A服務有代碼上的耦合??梢灾苯覶CP連接進行傳輸數(shù)據(jù),優(yōu)于接口調(diào)用的方式。 這是一套成熟的生產(chǎn)解決方案,也有不少binlog同步的中間件工具,所以我們關注的就是哪個工具能夠更好的構(gòu)建穩(wěn)定、性能滿足且易于高可用部署的方案。
經(jīng)過調(diào)研,我們選擇了canal[
https://github.com/alibaba/canal]。canal是阿里巴巴 MySQL binlog 增量訂閱&消費組件,已經(jīng)有在生產(chǎn)上實踐的例子,并且方便的支持和其他常用的中間件組件組合,比如kafka,elasticsearch等,也有了canal-go go語言的client庫,滿足我們在go上的需求,其他具體內(nèi)容參閱canal的github主頁。
原理簡圖
工作流程
1.Canal連接到A數(shù)據(jù)庫,模擬slave
2.canal-client與Canal建立連接,并訂閱對應的數(shù)據(jù)庫表
3.A數(shù)據(jù)庫發(fā)生變更寫入到binlog,Canal向數(shù)據(jù)庫發(fā)送dump請求,獲取binlog并解析,發(fā)送解析后的數(shù)據(jù)給canal-client
4.canal-client收到數(shù)據(jù),將數(shù)據(jù)同步到新的數(shù)據(jù)庫
安裝canal
下載canal
修改配置/conf/canal.properties
# ...
# 可選項: tcp(默認), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9002
canal.mq.retries = 0
# flagMessage模式下可以調(diào)大該值, 但不要超過MQ消息體大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下請將該值改大, 建議50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默認50K, 由于kafka最大消息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數(shù)據(jù)的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為flat json格式對象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投遞是否使用事務
canal.mq.transaction = false
# mq config
canal.mq.topic=default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
#解決消費順序問題
canal.mq.partitionHash=mydatabase.mytable
然后配置instance,找到
/conf/example/instance.properties配置文件:
## mysql serverId , v1.0.26+ will autoGen(自動生成,不需配置)
# canal.instance.mysql.slaveId=0
# position info
canal.instance.master.address=127.0.0.1:3306
# 在Mysql執(zhí)行 SHOW MASTER STATUS;查看當前數(shù)據(jù)庫的binlog
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# 賬號密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
#MQ隊列名稱
canal.mq.topic=canaltopic
#單隊列模式的分區(qū)下標
canal.mq.partition=0
啟動zookeeper和kafka
zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties
啟動 canal
canal/bin/start.bat
編寫讀取消息的相關代碼
kafka相關配置
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
# 發(fā)生錯誤后,消息重發(fā)的次數(shù)。
retries: 0
#當有多個消息需要被發(fā)送到同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計算。
batch-size: 16384
# 設置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
buffer-memory: 33554432
# 鍵的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生產(chǎn)者在成功寫入消息之前不會等待任何來自服務器的響應。
# acks=1 : 只要集群的首領節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務器成功響應。
# acks=all :只有當所有參與復制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務器的成功響應。
acks: 1
consumer:
# 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1
# 該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
# latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)
# earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄
auto-offset-reset: earliest
# 是否自動提交偏移量,默認值是true,為了避免出現(xiàn)重復數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設置為false,然后手動提交偏移量
enable-auto-commit: false
# 鍵的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在偵聽器容器中運行的線程數(shù)。
concurrency: 5
#listner負責ack,每調(diào)用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.retries}")
private String retries;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;
/**
* 生產(chǎn)者配置信息
*/
@Bean
public MapproducerConfigs() {
Mapprops = new HashMap();
//重試,0為不啟用重試機制
props.put(ProducerConfig.ACKS_CONFIG, "all");
//連接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 2);
//控制批處理大小,單位為字節(jié)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//批量發(fā)送,延遲為1毫秒,啟用該功能能有效減少生產(chǎn)者發(fā)送消息次數(shù),從而提高并發(fā)量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//生產(chǎn)者可以使用的總內(nèi)存字節(jié)來緩沖等待發(fā)送到服務器的記錄
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//鍵的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return props;
}
/** kafka無事務模式
* @return
*/
/* @Bean
public ProducerFactoryproducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}*/
/**
* 開啟kafka事務
*
* @return
*/
@Bean
public ProducerFactoryproducerFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());
//在producerFactory中開啟事務功能
factory.transactionCapable();
//TransactionIdPrefix是用來生成Transactional.id的前綴
factory.setTransactionIdPrefix("tran-");
return factory;
}
@Bean
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
return manager;
}
@Bean
public KafkaTemplatekafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
讀取消息
/**
* 如何解決topic指定對應的表(一個topic對應一個表即可解決此問題)
* @param record
* @param ack
* @param topic
*/
@KafkaListener(topics = KafkaConstants.CANAL_TOPIC, groupId = KafkaConstants.DISPATCH_GROUP)
public void canalConsumer(ConsumerRecord, ?> record, Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
String messageStr = (String) message.get();
CanalDtocanalDto = JSONObject.parseObject(messageStr, CanalDto.class);
LOGGER.info("canalConsumer 消費了: Topic:{},Message:{}", topic, messageStr);
LOGGER.info(canalDto.toString());
boolean isDdl = canalDto.isDdl();
if(!isDdl){
String type = canalDto.getType();
Listdata = canalDto.getData();
if("INSERT".equals(type)){
mongodbBase.batchSave(data,OrderTbl.class);
}else if ("UPDATE".equals(type)) {
// mongodbBase.updateFirst();
}else {
//刪除語句
for (OrderTbl orderTbl : data) {
mongodbBase.remove(orderTbl);
}
}
}
ack.acknowledge();
}
}
canal實體信息
public class CanalDtoimplements Serializable {
private static final long serialVersionUID = 3652575521269639607L;
//數(shù)據(jù)
private Listdata;
//數(shù)據(jù)庫名稱
private String database;
private long es;
//遞增,從1開始
private int id;
//是否是DDL語句
private boolean isDdl;
//表結(jié)構(gòu)的字段類型
private MysqlType mysqlType;
//UPDATE語句,舊數(shù)據(jù)
private String old;
//主鍵名稱
private ListpkNames;
//sql語句
private String sql;
private SqlTypeDto sqlType;
//表名
private String table;
private long ts;
//(新增)INSERT、(更新)UPDATE、(刪除)DELETE、(刪除表)ERASE等等
private String type;
}
分享文章:一篇帶給你跨數(shù)據(jù)源實現(xiàn)數(shù)據(jù)同步
本文地址:http://m.5511xx.com/article/dhgeehj.html


咨詢
建站咨詢
