canal--MySQL同步到Redis_canall同步 到redis-程序员宅基地

技术标签: 数据库中间件  mysql  

原文网址:canal--MySQL同步到Redis_IT利刃出鞘的博客-程序员宅基地

流程

        Canal的原理是模拟Slave向Master发送请求,Canal解析binlog,但不将解析结果持久化,而是保存在内存中,每次有客户端读取一次消息,就删除该消息。这里所说的客户端,就需要我们写一个连接Canal的程序,持续从Canal获取数据。
        本文我们要做的就是完成上图红圈内的部分。

安装与部署

MySQL的配置

参考网址:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

开启mysql的binlog模块

切换到mysql的安装路径(/etc/my.cnf(Linux)/my.ini (windows)),加入如下内容:

[mysqld]
log-bin=mysql-bin #启用binlog
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction,不能和canal的slaveId重复

        配置完成后,需要重启数据库。当重启数据库遇到问题时,耐心解决,但需要警告的是,千万别动data文件夹下的文件,因为容易使mysql下的所有数据库瞬间毁灭

创建canal用户

创建canal用户,用来管理canal的访问权限。我们可以通过对canal用户访问权限的控制,进而控制canal能够获取的内容

CREATE USER canal IDENTIFIED BY 'canal';    
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON 数据库名.表名 TO 'canal'@'%';  
-- GRANT ALL PRIVILEGES ON 数据库名.表名 TO 'canal'@'%' ;  
FLUSH PRIVILEGES;  

canal配置与部署

下载部署包

下载,解压,我使用的是最新版本1.0.22

Releases · alibaba/canal · GitHub

配置canal 

主要配置的文件有两处,canal/conf/example/instance.properties 和 canal/conf/canal.properties . 而canal.properties 文件我们一般保持默认配置,所以我们仅对instance.properties 进行修改。如果需要对canal进行复杂的配置,可参考canal系列--综述_mysql_feiying0canglang的博客-程序员宅基地

instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234

# position info
canal.instance.master.address = ***.***.***.***:3306 #改成自己的数据库地址
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password
canal.instance.dbUsername = canal #改成自己的数据库信息 
canal.instance.dbPassword = canal #改成自己的数据库信息 
canal.instance.defaultDatabaseName =  #改成自己的数据库信息
canal.instance.connectionCharset = UTF-8 #改成自己的数据库信息 

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex = 

启动canal

./canal/startup.sh

查看启动状态 

可以通过查看logs/canal/canal.log 和logs/example/example.log日志来判断canal是否启动成功。

canal/logs/canal/canal.log

2016-12-29 14:03:00.956 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2016-12-29 14:03:01.071 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.1.99:11111]
2016-12-29 14:03:01.628 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

canal/logs/example/example.log

2016-12-29 14:03:01.357 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2016-12-29 14:03:01.362 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2016-12-29 14:03:01.535 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2016-12-29 14:03:01.555 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

代码

参考:https://github.com/alibaba/canal/wiki/ClientExample

pom.xml

在maven项目中中加载canal和redis依赖包.

<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.sample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>  
        <groupId>com.alibaba.otter</groupId>  
        <artifactId>canal.client</artifactId>  
        <version>1.0.12</version>  
    </dependency>  
    
    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-test</artifactId>  
        <version>3.1.2.RELEASE</version>  
        <scope>test</scope>  
    </dependency>  
      
    <dependency>  
        <groupId>redis.clients</groupId>  
        <artifactId>jedis</artifactId>  
        <version>2.4.2</version>  
    </dependency>  
    
    </dependencies>
  <build/>
</project>

ClientSample.java

这里主要做两个工作,一个是循环从Canal上取数据,一个是将数据更新至Redis。

package canal.sample;
 
import java.net.InetSocketAddress;  
import java.util.List;  
 
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;  
import com.alibaba.otter.canal.common.utils.AddressUtils;  
import com.alibaba.otter.canal.protocol.Message;  
import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
import com.alibaba.otter.canal.client.*;  
 
public class ClientSample {  
 
   public static void main(String args[]) {  
	   
       // 创建链接  
       CanalConnector connector = CanalConnectors.newSingleConnector(
                                     new InetSocketAddress(AddressUtils.getHostIp(),11111), 
                                      "example", "", "");  
       int batchSize = 1000;  
       try {  
           connector.connect();  
           connector.subscribe(".*\\..*");  
           connector.rollback();    
           while (true) {  
               Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据  
               long batchId = message.getId();  
               int size = message.getEntries().size();  
               if (batchId == -1 || size == 0) {  
                   try {  
                       Thread.sleep(1000);  
                   } catch (InterruptedException e) {  
                       e.printStackTrace();  
                   }  
               } else {  
                   printEntry(message.getEntries());  
               }  
 
               connector.ack(batchId); // 提交确认  
               // connector.rollback(batchId); // 处理失败, 回滚数据  
           }  
 
       } finally {  
           connector.disconnect();  
       }  
   }  
 
   private static void printEntry( List<Entry> entrys) {  
       for (Entry entry : entrys) {  
           if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || 
                     entry.getEntryType() == EntryType.TRANSACTIONEND) {  
               continue;  
           }  
 
           RowChange rowChage = null;  
           try {  
               rowChage = RowChange.parseFrom(entry.getStoreValue());  
           } catch (Exception e) {  
               throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + 
                       entry.toString(),  
                       e);  
           }  
 
           EventType eventType = rowChage.getEventType();  
           System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                   eventType));  
 
           for (RowData rowData : rowChage.getRowDatasList()) {  
               if (eventType == EventType.DELETE) {  
            	   redisDelete(rowData.getBeforeColumnsList());  
               } else if (eventType == EventType.INSERT) {  
            	   redisInsert(rowData.getAfterColumnsList());  
               } else {  
                   System.out.println("-------> before");  
                   printColumn(rowData.getBeforeColumnsList());  
                   System.out.println("-------> after");  
                   redisUpdate(rowData.getAfterColumnsList());  
               }  
           }  
       }  
   }  
 
   private static void printColumn( List<Column> columns) {  
       for (Column column : columns) {  
           System.out.println(column.getName() + " : " + column.getValue() + 
                       "    update=" + column.getUpdated());  
       }  
   }  
   
	  private static void redisInsert( List<Column> columns){
		  JSONObject json=new JSONObject();
		  for (Column column : columns) {  
			  json.put(column.getName(), column.getValue());  
	       }  
		  if(columns.size()>0){
			  RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
		  }
	   }
	  
	  private static  void redisUpdate( List<Column> columns){
		  JSONObject json=new JSONObject();
		  for (Column column : columns) {  
			  json.put(column.getName(), column.getValue());  
	       }  
		  if(columns.size()>0){
			  RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
		  }
	  }
  
	   private static  void redisDelete( List<Column> columns){
		   JSONObject json=new JSONObject();
			  for (Column column : columns) {  
				  json.put(column.getName(), column.getValue());  
		       }  
			  if(columns.size()>0){
				  RedisUtil.delKey("user:"+ columns.get(0).getValue());
			  }
	   }
}  

RedisUtil.java

package canal.sample;
 
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
 
public class RedisUtil {
 
	// Redis服务器IP
	private static String ADDR = "10.1.2.190";
 
	// Redis的端口号
	private static int PORT = 6379;
 
	// 访问密码
	private static String AUTH = "admin";
 
	// 可用连接实例的最大数目,默认值为8;
	// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
	private static int MAX_ACTIVE = 1024;
 
	// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
	private static int MAX_IDLE = 200;
 
	// 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
	private static int MAX_WAIT = 10000;
 
	// 过期时间
	protected static int  expireTime = 60 * 60 *24;
	
	// 连接池
	protected static JedisPool pool;
 
	/**
	 * 静态代码,只在初次调用一次
	 */
	static {
		JedisPoolConfig config = new JedisPoolConfig();
		//最大连接数
		config.setMaxTotal(MAX_ACTIVE);
		//最多空闲实例
		config.setMaxIdle(MAX_IDLE);
		//超时时间
		config.setMaxWaitMillis(MAX_WAIT);
		//
		config.setTestOnBorrow(false);
		pool = new JedisPool(config, ADDR, PORT, 1000);
	}
 
	/**
	 * 获取jedis实例
	 */
	protected static synchronized Jedis getJedis() {
		Jedis jedis = null;
		try {
			jedis = pool.getResource();
		} catch (Exception e) {
			e.printStackTrace();
			if (jedis != null) {
				pool.returnBrokenResource(jedis);
			}
		}
		return jedis;
	}
 
	/**
	 * 释放jedis资源
	 * 
	 * @param jedis
	 * @param isBroken
	 */
	protected static void closeResource(Jedis jedis, boolean isBroken) {
		try {
			if (isBroken) {
				pool.returnBrokenResource(jedis);
			} else {
				pool.returnResource(jedis);
			}
		} catch (Exception e) {
 
		}
	}
 
	/**
	 *  是否存在key
	 * 
	 * @param key
	 */
	public static boolean existKey(String key) {
		Jedis jedis = null;
		boolean isBroken = false;
		try {
			jedis = getJedis();
			jedis.select(0);
			return jedis.exists(key);
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
		return false;
	}
 
	/**
	 *  删除key
	 * 
	 * @param key
	 */
	public static void delKey(String key) {
		Jedis jedis = null;
		boolean isBroken = false;
		try {
			jedis = getJedis();
			jedis.select(0);
			jedis.del(key);
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
	}
 
	/**
	 *  取得key的值
	 * 
	 * @param key
	 */
	public static String stringGet(String key) {
		Jedis jedis = null;
		boolean isBroken = false;
		String lastVal = null;
		try {
			jedis = getJedis();
			jedis.select(0);
			lastVal = jedis.get(key);
			jedis.expire(key, expireTime);
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
		return lastVal;
	}
 
	/**
	 *  添加string数据
	 * 
	 * @param key
	 * @param value
	 */
	public static String stringSet(String key, String value) {
		Jedis jedis = null;
		boolean isBroken = false;
		String lastVal = null;
		try {
			jedis = getJedis();
			jedis.select(0);
			lastVal = jedis.set(key, value);
			jedis.expire(key, expireTime);
		} catch (Exception e) {
			e.printStackTrace();
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
		return lastVal;
	}
 
	/**
	 *  添加hash数据
	 * 
	 * @param key
	 * @param field
	 * @param value
	 */
	public static void hashSet(String key, String field, String value) {
		boolean isBroken = false;
		Jedis jedis = null;
		try {
			jedis = getJedis();
			if (jedis != null) {
				jedis.select(0);
				jedis.hset(key, field, value);
				jedis.expire(key, expireTime);
			}
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
	}
}

注意:

1,客户端的Jedis连接不同于项目里的Jedis连接需要Spring注解,直接使用静态方法就可以。

运行

1,运行canal服务端startup.bat / startup.sh
2,运行客户端程序

注意

1,如果数据正在更新时,canal服务端挂掉,客户端也会跟着挂掉,mysql依然在插入数据,而redis则因为客户端的关闭而停止更新,造成mysql和redis的数据不一致。解决方法:重启canal服务端和客户端

解析:虽然canal服务端解析binlog后不会把数据持久化,但canal服务端会记录每次客户端消费的位置(客户端每次ack时服务端会记录pos点)。虽然canal服务端因为重启之前解析数据清空,但因为canal服务端记录的是客户端最后一次获取的pos点,canal服务端再从这个pos点开始解析,客户端更新至redis,以达到数据的一致。

2,如果只有一个canal服务端和一个客户端,肯定存在可用性低的问题。有两种解决方法。

法1:用程序来监控canal服务端和客户端,如果挂掉,再重启;
法2:多个canal服务端+zk,将canal服务端的配置文件放在zk,任何一个canal服务端挂掉后,切换到其他canal服务端,读到的配置文件的内容就是一致的(还有记录的消费pos点),保证业务的高可用,客户端可使用相同的做法。
见《从Paxos到Zookeeper 分布式一致性原理与实践》=> 6.3.3 案例三 基于MySQL Binlog的增量订阅和消费组件:Canal

其他网址

使用canal进行mysql数据同步到Redis_数据库_华仔的逆袭的专栏-程序员宅基地
利用Canal完成Mysql数据同步Redis_数据库_南山行者-程序员宅基地

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/feiying0canglang/article/details/105611369

智能推荐

JavaScript学习笔记_curry函数未定义-程序员宅基地

文章浏览阅读343次。五种原始的变量类型1.Undefined--未定义类型 例:var v;2.String -- ' '或" "3.Boolean4.Number5.Null--空类型 例: var v=null;Number中:NaN -- not a number非数本身是一个数字,但是它和任何数字都不相等,代表非数,它和自己都不相等判断是不是NaN不能用=_curry函数未定义

兑换码编码方案实践_优惠券编码规则-程序员宅基地

文章浏览阅读1.2w次,点赞2次,收藏17次。兑换码编码设计当前各个业务系统,只要涉及到产品销售,就离不开大大小小的运营活动需求,其中最普遍的就是兑换码需求,无论是线下活动或者是线上活动,都能起到良好的宣传效果。兑换码:由一系列字符组成,每一个兑换码对应系统中的一组信息,可以是优惠信息(优惠券),也可以是相关奖品信息。在实际的运营活动中,要求兑换码是唯一的,每一个兑换码对应一个优惠信息,而且需求量往往比较大(实际上的需求只有预期_优惠券编码规则

c语言周林答案,C语言程序设计实训教程教学课件作者周林ch04结构化程序设计课件.ppt...-程序员宅基地

文章浏览阅读45次。C语言程序设计实训教程教学课件作者周林ch04结构化程序设计课件.ppt* * 4.1 选择结构程序设计 4.2 循环结构程序设计 4.3 辅助控制语句 第四章 结构化程序设计 4.1 选择结构程序设计 在现实生活中,需要进行判断和选择的情况是很多的: 如果你在家,我去拜访你 如果考试不及格,要补考 如果遇到红灯,要停车等待 第四章 结构化程序设计 在现实生活中,需要进行判断和选择的情况..._在现实生活中遇到过条件判断的问

幻数使用说明_ioctl-number.txt幻数说明-程序员宅基地

文章浏览阅读999次。幻数使用说明 在驱动程序中实现的ioctl函数体内,实际上是有一个switch{case}结构,每一个case对应一个命令码,做出一些相应的操作。怎么实现这些操作,这是每一个程序员自己的事情。 因为设备都是特定的,这里也没法说。关键在于怎样组织命令码,因为在ioctl中命令码是唯一联系用户程序命令和驱动程序支持的途径 。 命令码的组织是有一些讲究的,因为我们一定要做到命令和设备是一一对应的,利_ioctl-number.txt幻数说明

ORB-SLAM3 + VScode:检测到 #include 错误。请更新 includePath。已为此翻译单元禁用波浪曲线_orb-slam3 include <system.h> 报错-程序员宅基地

文章浏览阅读399次。键盘按下“Shift+Ctrl+p” 输入: C++Configurations,选择JSON界面做如下改动:1.首先把 “/usr/include”,放在最前2.查看C++路径,终端输入gcc -v -E -x c++ - /usr/include/c++/5 /usr/include/x86_64-linux-gnu/c++/5 /usr/include/c++/5/backward /usr/lib/gcc/x86_64-linux-gnu/5/include /usr/local/_orb-slam3 include 报错

「Sqlserver」数据分析师有理由爱Sqlserver之十-Sqlserver自动化篇-程序员宅基地

文章浏览阅读129次。本系列的最后一篇,因未有精力写更多的入门教程,上篇已经抛出书单,有兴趣的朋友可阅读好书来成长,此系列主讲有理由爱Sqlserver的论证性文章,希望读者们看完后,可自行做出判断,Sqlserver是否真的合适自己,目的已达成。渴望自动化及使用场景笔者所最能接触到的群体为Excel、PowerBI用户群体,在Excel中,我们知道可以使用VBA、VSTO来给Excel带来自动化操作..._sqlsever 数据分析

随便推点

智慧校园智慧教育大数据平台(教育大脑)项目建设方案PPT_高校智慧大脑-程序员宅基地

文章浏览阅读294次,点赞6次,收藏4次。教育智脑)建立学校的全连接中台,对学校运营过程中的数据进行处理和标准化管理,挖掘数据的价值。能:一、原先孤立的系统聚合到一个统一的平台,实现单点登录,统一身份认证,方便管理;三、数据共享,盘活了教育大数据资源,通过对外提供数。的方式构建教育的通用服务能力平台,支撑教育核心服务能力的沉淀和共享。物联网将学校的各要素(人、机、料、法、环、测)全面互联,数据实时。智慧校园解决方案,赋能教学、管理和服务升级,智慧教育体系,该数据平台具有以下几大功。教育大数据平台底座:教育智脑。教育大数据平台,以中国联通。_高校智慧大脑

编程5大算法总结--概念加实例_算法概念实例-程序员宅基地

文章浏览阅读9.5k次,点赞2次,收藏27次。分治法,动态规划法,贪心算法这三者之间有类似之处,比如都需要将问题划分为一个个子问题,然后通过解决这些子问题来解决最终问题。但其实这三者之间的区别还是蛮大的。贪心是则可看成是链式结构回溯和分支界限为穷举式的搜索,其思想的差异是深度优先和广度优先一:分治算法一、基本概念在计算机科学中,分治法是一种很重要的算法。字面上的解释是“分而治之”,就是把一个复杂的问题分成两_算法概念实例

随笔—醒悟篇之考研调剂_考研调剂抑郁-程序员宅基地

文章浏览阅读5.6k次。考研篇emmmmm,这是我随笔篇章的第二更,原本计划是在中秋放假期间写好的,但是放假的时候被安排写一下单例模式,做了俩机试题目,还刷了下PAT的东西,emmmmm,最主要的还是因为我浪的很开心,没空出时间来写写东西。  距离我考研结束已经快两年了,距离今年的考研还有90天左右。  趁着这个机会回忆一下青春,这一篇会写的比较有趣,好玩,纯粹是为了记录一下当年考研中发生的有趣的事。  首先介绍..._考研调剂抑郁

SpringMVC_class org.springframework.web.filter.characterenco-程序员宅基地

文章浏览阅读438次。SpringMVC文章目录SpringMVC1、SpringMVC简介1.1 什么是MVC1.2 什么是SpringMVC1.3 SpringMVC的特点2、HelloWorld2.1 开发环境2.2 创建maven工程a>添加web模块b>打包方式:warc>引入依赖2.3 配置web.xml2.4 创建请求控制器2.5 创建SpringMVC的配置文件2.6 测试Helloworld2.7 总结3、@RequestMapping注解3.1 @RequestMapping注解的功能3._class org.springframework.web.filter.characterencodingfilter is not a jakart

gdb: Don‘t know how to run. Try “help target“._don't know how to run. try "help target".-程序员宅基地

文章浏览阅读4.9k次。gdb 远程调试的一个问题:Don't know how to run. Try "help target".它在抱怨不知道怎么跑,目标是什么. 你需要为它指定target remote 或target extended-remote例如:target extended-remote 192.168.1.136:1234指明target 是某IP的某端口完整示例如下:targ..._don't know how to run. try "help target".

c语言程序设计教程 郭浩志,C语言程序设计教程答案杨路明郭浩志-程序员宅基地

文章浏览阅读85次。习题 11、算法描述主要是用两种基本方法:第一是自然语言描述,第二是使用专用工具进行算法描述2、c 语言程序的结构如下:1、c 语言程序由函数组成,每个程序必须具有一个 main 函数作为程序的主控函数。2、“/*“与“*/“之间的内容构成 c 语言程序的注释部分。3、用预处理命令#include 可以包含有关文件的信息。4、大小写字母在 c 语言中是有区别的。5、除 main 函数和标准库函数以..._c语言语法0x1e