原文网址:canal--MySQL同步到Redis_IT利刃出鞘的博客-程序员宅基地
Canal的原理是模拟Slave向Master发送请求,Canal解析binlog,但不将解析结果持久化,而是保存在内存中,每次有客户端读取一次消息,就删除该消息。这里所说的客户端,就需要我们写一个连接Canal的程序,持续从Canal获取数据。
本文我们要做的就是完成上图红圈内的部分。
参考网址:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
开启mysql的binlog模块
切换到mysql的安装路径(/etc/my.cnf(Linux)/my.ini (windows)),加入如下内容:
[mysqld]
log-bin=mysql-bin #启用binlog
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction,不能和canal的slaveId重复
配置完成后,需要重启数据库。当重启数据库遇到问题时,耐心解决,但需要警告的是,千万别动data
文件夹下的文件,因为容易使mysql下的所有数据库瞬间毁灭。
创建canal用户
创建canal用户,用来管理canal的访问权限。我们可以通过对canal用户访问权限的控制,进而控制canal能够获取的内容
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON 数据库名.表名 TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON 数据库名.表名 TO 'canal'@'%' ;
FLUSH PRIVILEGES;
下载部署包
下载,解压,我使用的是最新版本1.0.22
Releases · alibaba/canal · GitHub
配置canal
主要配置的文件有两处,canal/conf/example/instance.properties 和 canal/conf/canal.properties . 而canal.properties 文件我们一般保持默认配置,所以我们仅对instance.properties 进行修改。如果需要对canal进行复杂的配置,可参考canal系列--综述_mysql_feiying0canglang的博客-程序员宅基地。
instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
# position info
canal.instance.master.address = ***.***.***.***:3306 #改成自己的数据库地址
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password
canal.instance.dbUsername = canal #改成自己的数据库信息
canal.instance.dbPassword = canal #改成自己的数据库信息
canal.instance.defaultDatabaseName = #改成自己的数据库信息
canal.instance.connectionCharset = UTF-8 #改成自己的数据库信息
# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =
启动canal
./canal/startup.sh
查看启动状态
可以通过查看logs/canal/canal.log
和logs/example/example.log
日志来判断canal是否启动成功。
canal/logs/canal/canal.log
2016-12-29 14:03:00.956 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2016-12-29 14:03:01.071 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.1.99:11111]
2016-12-29 14:03:01.628 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
canal/logs/example/example.log
2016-12-29 14:03:01.357 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2016-12-29 14:03:01.362 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2016-12-29 14:03:01.535 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2016-12-29 14:03:01.555 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
参考:https://github.com/alibaba/canal/wiki/ClientExample
在maven项目中中加载canal和redis依赖包.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.sample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>3.1.2.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
<build/>
</project>
这里主要做两个工作,一个是循环从Canal上取数据,一个是将数据更新至Redis。
package canal.sample;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
public class ClientSample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(AddressUtils.getHostIp(),11111),
"example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
private static void printEntry( List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" +
entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn( List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() +
" update=" + column.getUpdated());
}
}
private static void redisInsert( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}
private static void redisUpdate( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}
private static void redisDelete( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.delKey("user:"+ columns.get(0).getValue());
}
}
}
package canal.sample;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
// Redis服务器IP
private static String ADDR = "10.1.2.190";
// Redis的端口号
private static int PORT = 6379;
// 访问密码
private static String AUTH = "admin";
// 可用连接实例的最大数目,默认值为8;
// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
private static int MAX_ACTIVE = 1024;
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
private static int MAX_IDLE = 200;
// 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int MAX_WAIT = 10000;
// 过期时间
protected static int expireTime = 60 * 60 *24;
// 连接池
protected static JedisPool pool;
/**
* 静态代码,只在初次调用一次
*/
static {
JedisPoolConfig config = new JedisPoolConfig();
//最大连接数
config.setMaxTotal(MAX_ACTIVE);
//最多空闲实例
config.setMaxIdle(MAX_IDLE);
//超时时间
config.setMaxWaitMillis(MAX_WAIT);
//
config.setTestOnBorrow(false);
pool = new JedisPool(config, ADDR, PORT, 1000);
}
/**
* 获取jedis实例
*/
protected static synchronized Jedis getJedis() {
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (Exception e) {
e.printStackTrace();
if (jedis != null) {
pool.returnBrokenResource(jedis);
}
}
return jedis;
}
/**
* 释放jedis资源
*
* @param jedis
* @param isBroken
*/
protected static void closeResource(Jedis jedis, boolean isBroken) {
try {
if (isBroken) {
pool.returnBrokenResource(jedis);
} else {
pool.returnResource(jedis);
}
} catch (Exception e) {
}
}
/**
* 是否存在key
*
* @param key
*/
public static boolean existKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
return jedis.exists(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return false;
}
/**
* 删除key
*
* @param key
*/
public static void delKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
jedis.del(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
/**
* 取得key的值
*
* @param key
*/
public static String stringGet(String key) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.get(key);
jedis.expire(key, expireTime);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加string数据
*
* @param key
* @param value
*/
public static String stringSet(String key, String value) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.set(key, value);
jedis.expire(key, expireTime);
} catch (Exception e) {
e.printStackTrace();
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加hash数据
*
* @param key
* @param field
* @param value
*/
public static void hashSet(String key, String field, String value) {
boolean isBroken = false;
Jedis jedis = null;
try {
jedis = getJedis();
if (jedis != null) {
jedis.select(0);
jedis.hset(key, field, value);
jedis.expire(key, expireTime);
}
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
}
注意:
1,客户端的Jedis连接不同于项目里的Jedis连接需要Spring注解,直接使用静态方法就可以。
运行
1,运行canal服务端startup.bat / startup.sh
2,运行客户端程序
1,如果数据正在更新时,canal服务端挂掉,客户端也会跟着挂掉,mysql依然在插入数据,而redis则因为客户端的关闭而停止更新,造成mysql和redis的数据不一致。解决方法:重启canal服务端和客户端
解析:虽然canal服务端解析binlog后不会把数据持久化,但canal服务端会记录每次客户端消费的位置(客户端每次ack时服务端会记录pos点)。虽然canal服务端因为重启之前解析数据清空,但因为canal服务端记录的是客户端最后一次获取的pos点,canal服务端再从这个pos点开始解析,客户端更新至redis,以达到数据的一致。
2,如果只有一个canal服务端和一个客户端,肯定存在可用性低的问题。有两种解决方法。
法1:用程序来监控canal服务端和客户端,如果挂掉,再重启;
法2:多个canal服务端+zk,将canal服务端的配置文件放在zk,任何一个canal服务端挂掉后,切换到其他canal服务端,读到的配置文件的内容就是一致的(还有记录的消费pos点),保证业务的高可用,客户端可使用相同的做法。
见《从Paxos到Zookeeper 分布式一致性原理与实践》=> 6.3.3 案例三 基于MySQL Binlog的增量订阅和消费组件:Canal
使用canal进行mysql数据同步到Redis_数据库_华仔的逆袭的专栏-程序员宅基地
利用Canal完成Mysql数据同步Redis_数据库_南山行者-程序员宅基地
文章浏览阅读343次。五种原始的变量类型1.Undefined--未定义类型 例:var v;2.String -- ' '或" "3.Boolean4.Number5.Null--空类型 例: var v=null;Number中:NaN -- not a number非数本身是一个数字,但是它和任何数字都不相等,代表非数,它和自己都不相等判断是不是NaN不能用=_curry函数未定义
文章浏览阅读1.2w次,点赞2次,收藏17次。兑换码编码设计当前各个业务系统,只要涉及到产品销售,就离不开大大小小的运营活动需求,其中最普遍的就是兑换码需求,无论是线下活动或者是线上活动,都能起到良好的宣传效果。兑换码:由一系列字符组成,每一个兑换码对应系统中的一组信息,可以是优惠信息(优惠券),也可以是相关奖品信息。在实际的运营活动中,要求兑换码是唯一的,每一个兑换码对应一个优惠信息,而且需求量往往比较大(实际上的需求只有预期_优惠券编码规则
文章浏览阅读45次。C语言程序设计实训教程教学课件作者周林ch04结构化程序设计课件.ppt* * 4.1 选择结构程序设计 4.2 循环结构程序设计 4.3 辅助控制语句 第四章 结构化程序设计 4.1 选择结构程序设计 在现实生活中,需要进行判断和选择的情况是很多的: 如果你在家,我去拜访你 如果考试不及格,要补考 如果遇到红灯,要停车等待 第四章 结构化程序设计 在现实生活中,需要进行判断和选择的情况..._在现实生活中遇到过条件判断的问
文章浏览阅读999次。幻数使用说明 在驱动程序中实现的ioctl函数体内,实际上是有一个switch{case}结构,每一个case对应一个命令码,做出一些相应的操作。怎么实现这些操作,这是每一个程序员自己的事情。 因为设备都是特定的,这里也没法说。关键在于怎样组织命令码,因为在ioctl中命令码是唯一联系用户程序命令和驱动程序支持的途径 。 命令码的组织是有一些讲究的,因为我们一定要做到命令和设备是一一对应的,利_ioctl-number.txt幻数说明
文章浏览阅读399次。键盘按下“Shift+Ctrl+p” 输入: C++Configurations,选择JSON界面做如下改动:1.首先把 “/usr/include”,放在最前2.查看C++路径,终端输入gcc -v -E -x c++ - /usr/include/c++/5 /usr/include/x86_64-linux-gnu/c++/5 /usr/include/c++/5/backward /usr/lib/gcc/x86_64-linux-gnu/5/include /usr/local/_orb-slam3 include 报错
文章浏览阅读129次。本系列的最后一篇,因未有精力写更多的入门教程,上篇已经抛出书单,有兴趣的朋友可阅读好书来成长,此系列主讲有理由爱Sqlserver的论证性文章,希望读者们看完后,可自行做出判断,Sqlserver是否真的合适自己,目的已达成。渴望自动化及使用场景笔者所最能接触到的群体为Excel、PowerBI用户群体,在Excel中,我们知道可以使用VBA、VSTO来给Excel带来自动化操作..._sqlsever 数据分析
文章浏览阅读294次,点赞6次,收藏4次。教育智脑)建立学校的全连接中台,对学校运营过程中的数据进行处理和标准化管理,挖掘数据的价值。能:一、原先孤立的系统聚合到一个统一的平台,实现单点登录,统一身份认证,方便管理;三、数据共享,盘活了教育大数据资源,通过对外提供数。的方式构建教育的通用服务能力平台,支撑教育核心服务能力的沉淀和共享。物联网将学校的各要素(人、机、料、法、环、测)全面互联,数据实时。智慧校园解决方案,赋能教学、管理和服务升级,智慧教育体系,该数据平台具有以下几大功。教育大数据平台底座:教育智脑。教育大数据平台,以中国联通。_高校智慧大脑
文章浏览阅读9.5k次,点赞2次,收藏27次。分治法,动态规划法,贪心算法这三者之间有类似之处,比如都需要将问题划分为一个个子问题,然后通过解决这些子问题来解决最终问题。但其实这三者之间的区别还是蛮大的。贪心是则可看成是链式结构回溯和分支界限为穷举式的搜索,其思想的差异是深度优先和广度优先一:分治算法一、基本概念在计算机科学中,分治法是一种很重要的算法。字面上的解释是“分而治之”,就是把一个复杂的问题分成两_算法概念实例
文章浏览阅读5.6k次。考研篇emmmmm,这是我随笔篇章的第二更,原本计划是在中秋放假期间写好的,但是放假的时候被安排写一下单例模式,做了俩机试题目,还刷了下PAT的东西,emmmmm,最主要的还是因为我浪的很开心,没空出时间来写写东西。 距离我考研结束已经快两年了,距离今年的考研还有90天左右。 趁着这个机会回忆一下青春,这一篇会写的比较有趣,好玩,纯粹是为了记录一下当年考研中发生的有趣的事。 首先介绍..._考研调剂抑郁
文章浏览阅读438次。SpringMVC文章目录SpringMVC1、SpringMVC简介1.1 什么是MVC1.2 什么是SpringMVC1.3 SpringMVC的特点2、HelloWorld2.1 开发环境2.2 创建maven工程a>添加web模块b>打包方式:warc>引入依赖2.3 配置web.xml2.4 创建请求控制器2.5 创建SpringMVC的配置文件2.6 测试Helloworld2.7 总结3、@RequestMapping注解3.1 @RequestMapping注解的功能3._class org.springframework.web.filter.characterencodingfilter is not a jakart
文章浏览阅读4.9k次。gdb 远程调试的一个问题:Don't know how to run. Try "help target".它在抱怨不知道怎么跑,目标是什么. 你需要为它指定target remote 或target extended-remote例如:target extended-remote 192.168.1.136:1234指明target 是某IP的某端口完整示例如下:targ..._don't know how to run. try "help target".
文章浏览阅读85次。习题 11、算法描述主要是用两种基本方法:第一是自然语言描述,第二是使用专用工具进行算法描述2、c 语言程序的结构如下:1、c 语言程序由函数组成,每个程序必须具有一个 main 函数作为程序的主控函数。2、“/*“与“*/“之间的内容构成 c 语言程序的注释部分。3、用预处理命令#include 可以包含有关文件的信息。4、大小写字母在 c 语言中是有区别的。5、除 main 函数和标准库函数以..._c语言语法0x1e