记录下较为完整的Rabbitmq学习笔记_编写将mq对象保存到数据库进行人工检查数据完整性-程序员宅基地

技术标签: java  rabbitmq  

Ribbitmq概括

概念

    消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

看场景理解mq

如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。

消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。

Windows环境安装

rabbitMQ是Erlang语言开发的所以先下载
Erlang:下载地址
双击安装完成后
1、配置环境变量

系统变量
ERLANG_HOME
D:\java\erl-24.0
环境变量 
%ERLANG_HOME%\bin

2、windows打开cmd控制台输入cmd,测试输入erl出现一下内容

Eshell V12.0  (abort with ^G)
1>

3、下载RabbitMQ
Rabbitmq:下载地址
4、双击安装
安装完成后,开始安装RabbitMQ-Plugins插件

先cd D:\java\RabbitMQ Server\rabbitmq_server-3.9.4\sbin

然后运行命令:rabbitmq-plugins enable rabbitmq_management

出现一下画面成功

Enabling plugins on node rabbit@WNDN-750:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@WNDN-750...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
started 3 plugins.

5、执行rabbitmqctl status,出现以下内容,说明成功

Status of node rabbit@WNDN-750 ...
[1mRuntime[0m
OS PID: 14016
OS: Windows
Uptime (seconds): 185
Is under maintenance?: false
RabbitMQ version: 3.9.4
Node name: rabbit@WNDN-750
Erlang configuration: Erlang/OTP 24 [erts-12.0] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [jit]
Erlang processes: 402 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60
[1mPlugins[0m
...............

6、运行 D:\java\RabbitMQ Server\rabbitmq_server-3.9.4\sbin\rabbitmq-server.bat

等几秒钟,在浏览器访问http://localhost:15672/

successful……

Linux环境安装

Docker installation

1)yum 包更新到最新
> yum update
(2)安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
> yum install -y yum-utils device-mapper-persistent-data lvm2
(3)设置yum源为阿里云
> yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
(4)安装docker
> yum install docker-ce -y
(5)安装后查看docker版本
> docker -v
 (6) 安装加速镜像
 sudo mkdir -p /etc/docker
 sudo tee /etc/docker/daemon.json <<-'EOF'
 {
    
  "registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"]
 }
 EOF
 sudo systemctl daemon-reload
 sudo systemctl restart docker
 (7) 获取rabbit镜像:
> docker pull rabbitmq:management
 (8)创建并运行容器
> docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e           RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
 (9)查看日志
> docker logs -f myrabbit
 (10)查看服务
> docker ps -a
 (11)关闭容器
> docker be9df4f0292e stop

Other commands

# 启动docker:
systemctl start docker
# 停止docker:
systemctl stop docker
# 重启docker:
systemctl restart docker
# 查看docker状态:
systemctl status docker
# 开机启动:  
systemctl enable docker
systemctl unenable docker
# 查看docker概要信息
docker info
# 查看docker帮助文档
docker --help

rabbitmq修改密码

  1. 在所有应用中找到rabbitMQ command promot程序并单击单开。
  2. 在打开的命令窗口中输入rabbitmqctl add_user test 123456后回车,test为新增登录账户,123456为账户密码。
  3. 然后再敲入rabbitmqctl set_user_tags test administrator后回车。
  4. 再给test账户设置 操作主机的权限。敲入rabbitmqctl set_permissions -p / test “.“ “.“ “.*”。
  5. 回到登录页面,用账户名为test,密码为123456进行登录就ok了。

Ribbitmq队列

消息队列协议
AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
特性:
1:分布式事务支持。
2:消息的持久化支持。
3:高性能和高可靠的消息处理优势。
在这里插入图片描述
面试题:为什么ribbitmq不使用http协议

  1. 因为http协议请求头很复杂,包含了cookies,数据的加密解密,状态码等附加功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,他其实就负责数据传递,存储,分发就可以了,一定要追求的是高性能,尽量简洁,快速。
  2. 大部分情况下http都是短连接,在交互过程中可能因为服务器宕机中断以后就不会进行持久化,就会会照成请求的丢失,这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期获取消息得过程,出现问题和故障要对数据或消息进行持久化等,目的就是为了保障数据得高可靠和稳健的运行。

消息队列持久化
RabbitMQ在两种情况下会将消息写入磁盘:

  1. 消息本身在 publish 的时候就要求消息写入磁盘;
  2. 内存紧张 需要将部分内存中的消息转移到磁盘;
    在这里插入图片描述
    消息队列消费策略
    MQ消息队列有如下几个角色
    1:生产者
    2:存储消息
    3:消费者
    那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。

场景1
在这里插入图片描述
比如我在APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被那个系统或者那些服务或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论。

场景2
在这里插入图片描述
在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发。

消息队列高可用和高可靠
什么是高可用机制

高可用是指产品在规定的条件和规定的时刻或者时间内处于可执行规定功能状态的能力。

当业务量增加时,请求也过大,一台消息中间件的服务器会触及硬件(CPU、内存、磁盘)的极限,一台消息中 间件的服务器已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的。

什么是高可靠

在高并发应用场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。

保证中间件消息的可靠性尼?可从两个方面考虑

1:消息的传输:通过协议来保证系统间数据解析的正确性。

2、消息的存储可靠:通过持久化来保证消息的可靠性。

反正终归三句话:
1:要么消息共享,
2:要么消息同步
3:要么元数据共享

五种工作模式

    1、生产者:生成消息,发送到交换机

交换机:根据消息属性,将消息发送给队列(如果没有声明交换机,则使用默认交换机)

​ 消费者:监听这个队列,发现消息后,获取消息执行消费逻辑

​ 应用场景:

​ 常见的应用场景就是一发,一接的结构

​ 例如:

​ 手机短信,邮件单发

代码测试

package cn.tedu.test.rabbit;
import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 完成简单模式一发一接的结构
 */
public class SimpleMode {
    
    //初始化连接对象 短连接
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        //ip:port tedu/123456
        /*
            1.长链接工厂,提供4个属性 ip port tedu 123456
            2.获取长连接
            3.给成员变量channel赋值
         */
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    //测试包含3个方法
    //声明组件,交换机和队列,简单模式案例,交换机使用默认交换机.队列需要声明
    @Test
    public void myQueueDeclare() throws IOException {
    
        //调用channel的方法,声明队列
        channel.queueDeclare(
                "simple",//设置路由key
                false,//boolean类型,队列是否持久化
                false,//boolean类型,队列是否专属,
                // 只有创建声明队列的连接没有断开,队列才可用
                false,//boolean类型,队列是否自动删除.从第一个消费端监听队列开始
                //计算,直到最后一个消费端断开连接,队列就会自动删除
                null);//map类型,key值是固定一批属性
        System.out.println("队列声明成功");
    }
    //发送消息到队列 生产端,永远不会吧消息直接发给队列,发给交换机
    //目前可以使用7个交换机来接收消息
    @Test
    public void send() throws IOException {
    
        //准备个消息 发送的是byte[]
        String msg="宝贝一到手,风紧扯呼";
        byte[] msgByte=msg.getBytes();
        //将消息发给(AMQP DEFAULT)交换机 名字""
        channel.basicPublish(
                "",//发送给的交换机的名字,默认为空
                "simple",//路由key,你想让交换机把消息传递给哪个队列的名称
                null,//发送消息时,携带的头,属性等.例如
                // app_id content-type priority优先级
                msgByte//消息体
        );
    }
    //消费端
    @Test
    public void consume() throws IOException {
    
        //消费消息
        channel.basicConsume("simple", false,
                new DeliverCallback() {
    
                    /**传递回调对象. 消息就在这个对象里
                     * @param consumerTag 当前消费端id
                     * @param message 封装了消息的对象
                     * @throws IOException
                     */
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
    
                //从消息对象中拿到信息
                byte[] body = message.getBody();
                System.out.println(new String(body));
                //如果autoAck false说明消费完消息,需要手动确认
                channel.basicAck(
                        message.getEnvelope().getDeliveryTag(),
                        false);
            }
        }, new CancelCallback() {
    
                    /**
                     * 当连接对象channel 主动关闭消费端连接时 cancel 这个方法才会被调用
                     * @param consumerTag 消费端id
                     * @throws IOException
                     */
            @Override
            public void handle(String consumerTag) throws IOException {
    
            }
        });
        //使用while true 将线程卡死,否则看不到消息消费逻辑
        while(true);
    }
}
    当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

默认轮询,以下为

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author: 学相伴-飞哥
 * @description: Producer 简单队列生产者
 * @Date : 2021/3/2
 */
public class Producer {
    
    pub
        try {
    
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            //===============================end topic模式==================================
            for (int i = 1; i <= 20; i++) {
    
                //消息的内容
                String msg = "学相伴:" + i;
                // 7: 发送消息给中间件rabbitmq-server
                // @params1: 交换机exchange
                // @params2: 队列名称/routingkey
                // @params3: 属性配置
                // @params4: 发送消息的内容
                channel.basicPublish("", "queue1", null, msg.getBytes());
            }
            System.out.println("消息发送成功!");
    }
}

消费者1的逻辑

Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
    
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
    
                    try{
    
                        System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                    }catch(Exception ex){
    
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
    
                @Override
                public void handle(String s) throws IOException {
    
                }
            });

消费者2的逻辑

Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
    
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
    
                    try{
    
                        System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(200);
                    }catch(Exception ex){
    
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
    
                @Override
                public void handle(String s) throws IOException {
    
                }
            });

工作争抢
生产者:发送消息到交换机

交换机:根据消息属性将消息发送给队列

消费者:多个消费者,同时绑定监听一个队列,之间形成了争抢消息的效果

应用场景

​ 抢红包

​ 资源分配

代码实现

package cn.tedu.test.rabbit;
import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 完成一发多抢的结构
 */
public class WorkMode {
    
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    @Test
    public void myQueueDeclare() throws IOException {
    
        //调用channel的方法,声明队列
        channel.queueDeclare(
                "work",
                false,
                false,
                false,
                null);
        System.out.println("队列声明成功");
    }
    @Test
    public void send() throws IOException {
    
        //准备个消息 发送的是byte[]
        String msg="宝贝一到手,风紧扯呼1111";
        byte[] msgByte=msg.getBytes();
        //将消息发给(AMQP DEFAULT)交换机 名字""
        channel.basicPublish(
                "",//发送给的交换机的名字
                "work",//路由key,你想让交换机把消息传递给哪个队列的名称
                null,//发送消息时,携带的头,属性等.例如
                // app_id content-type priority优先级
                msgByte//消息体
        );
    }
    //消费端
    @Test
    public void consume01() throws IOException {
    
        //消费消息
        channel.basicConsume("work", false,
                new DeliverCallback() {
    
                    /**传递回调对象. 消息就在这个对象里
                     * @param consumerTag 当前消费端id
                     * @param message 封装了消息的对象
                     * @throws IOException
                     */
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
    
                //从消息对象中拿到信息
                byte[] body = message.getBody();
                System.out.println("消费者01:"+new String(body));
                //如果autoAck false说明消费完消息,需要手动确认
                channel.basicAck(
                        message.getEnvelope().getDeliveryTag(),
                        false);
            }
        }, new CancelCallback() {
    
                    /**
                     * 当连接对象channel 主动关闭消费端连接时 cancel 这个方法才会被调用
                     * @param consumerTag 消费端id
                     * @throws IOException
                     */
            @Override
            public void handle(String consumerTag) throws IOException {
    
            }
        });
        //使用while true 将线程卡死,否则看不到消息消费逻辑
        while(true);
    }
    @Test
    public void consume02() throws IOException {
    
        //消费消息
        channel.basicConsume("work", false,
                new DeliverCallback() {
    
                    /**传递回调对象. 消息就在这个对象里
                     * @param consumerTag 当前消费端id
                     * @param message 封装了消息的对象
                     * @throws IOException
                     */
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
    
                        //从消息对象中拿到信息
                        byte[] body = message.getBody();
                        System.out.println("消费者02:"+new String(body));
                        //如果autoAck false说明消费完消息,需要手动确认
                        channel.basicAck(
                                message.getEnvelope().getDeliveryTag(),
                                false);
                    }
                }, new CancelCallback() {
    
                    /**
                     * 当连接对象channel 主动关闭消费端连接时 cancel 这个方法才会被调用
                     * @param consumerTag 消费端id
                     * @throws IOException
                     */
                    @Override
                    public void handle(String consumerTag) throws IOException {
    
                    }
                });
        //使用while true 将线程卡死,否则看不到消息消费逻辑
        while(true);
    }
}

路由模式
生产端:发送的消息携带具体的路由key值

交换机:接收路由key值,判断和当前交换机绑定后端队列哪个满足路由的匹配将消息发送给这个队列

应用场景

处理一些特殊的消息逻辑,可以经过路由的筛选

代码测试

package cn.tedu.test.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 路由模式
 */
public class DirectMode {
    
    //初始化连接
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        //ip:port tedu/123456
        /*
            1.长链接工厂,提供4个属性 ip port tedu 123456
            2.获取长连接
            3.给成员变量channel赋值
         */
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    //准备交换机,队列的名称属性
    private static final String TYPE="direct";
    private static final String EX_NAME=TYPE+"_ex";//fanout_ex
    private static final String QUEUE01=TYPE+"_Q1";
    private static final String QUEUE02=TYPE+"_Q2";
    //声明三个组件 一个交换机 2个队列
    @Test
    public void declare() throws IOException {
    
        //声明队列
        channel.queueDeclare(QUEUE01,false,false,false,null);
        channel.queueDeclare(QUEUE02,false,false,false,null);
        //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
        //声明交换机
        channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机
        //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意
        channel.queueBind(QUEUE01,EX_NAME,"北京");
        channel.queueBind(QUEUE01,EX_NAME,"广州");
        channel.queueBind(QUEUE02,EX_NAME,"上海");
    }
    @Test
    public void send() throws IOException {
    
        String msg="你好,路由模式交换机";
        byte[] bytes = msg.getBytes();
        channel.basicPublish(EX_NAME,"北京",null,bytes);
    }
}

发布订阅
生产端:发送消息到交换机

交换机:由于是发布订阅模式,会将这个消息发送同步到后端所有与其绑定的队列

消息端:简单模式 1个队列绑定一个消费者 争抢模式 1个队列绑定多个消费者

应用场景

邮件的群发,广告的群发

代码测试

package cn.tedu.test.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 一发多接的队列结构
 */
public class FanoutMode {
    
    //初始化连接
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        //ip:port tedu/123456
        /*
            1.长链接工厂,提供4个属性 ip port tedu 123456
            2.获取长连接
            3.给成员变量channel赋值
         */
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(15672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    //准备交换机,队列的名称属性
    private static final String TYPE="fanout";
    private static final String EX_NAME=TYPE+"_ex";//fanout_ex
    private static final String QUEUE01=TYPE+"_Q1";
    private static final String QUEUE02=TYPE+"_Q2";
    //声明三个组件 一个交换机 2个队列
    @Test
    public void declare() throws IOException {
    
        //声明队列
        channel.queueDeclare(QUEUE01,false,false,false,null);
        channel.queueDeclare(QUEUE02,false,false,false,null);
        //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
        //声明交换机
        channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机
        //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意
        channel.queueBind(QUEUE01,EX_NAME,"");
        channel.queueBind(QUEUE02,EX_NAME,"");
    }
    @Test
    public void send() throws IOException {
    
        String msg="你好,发布订阅模式";
        byte[] bytes = msg.getBytes();
        channel.basicPublish(EX_NAME,"北京",null,bytes);
    }
}

主题模式
结构

交换机绑定队列,不在使用具体的路由key,可以使用符号代替路由key值的规则

#:表示任意多级的任意长度的字符串

*:任意长度字符串,但是只有一级

​ 中国.北京.朝阳.望京.葫芦村

 匹配到 中国.#
 匹配到 中国.上海.#
 匹配到 中国.*.*.*
 匹配到 中国.*.朝阳.*.*

应用场景

实现多级传递的路由筛选工作,记录trace过程.

代码测试

package cn.tedu.test.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 主题模式
 */
public class TopicMode {
    
    //初始化连接
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        //ip:port tedu/123456
        /*
            1.长链接工厂,提供4个属性 ip port tedu 123456
            2.获取长连接
            3.给成员变量channel赋值
         */
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.91.151");
        factory.setPort(5672);
        factory.setUsername("tedu");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    //准备交换机,队列的名称属性
    private static final String TYPE="topic";
    private static final String EX_NAME=TYPE+"_ex";//fanout_ex
    private static final String QUEUE01=TYPE+"_Q1";
    private static final String QUEUE02=TYPE+"_Q2";
    //声明三个组件 一个交换机 2个队列
    @Test
    public void declare() throws IOException {
    
        //声明队列
        channel.queueDeclare(QUEUE01,false,false,false,null);
        channel.queueDeclare(QUEUE02,false,false,false,null);
        //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
        //声明交换机
        channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机
        //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意
        channel.queueBind(QUEUE01,EX_NAME,"中国.北京.#");
        channel.queueBind(QUEUE01,EX_NAME,"中国.*.*.*.*");
        channel.queueBind(QUEUE02,EX_NAME,"*.上海.#");
    }
    @Test
    public void send() throws IOException {
    
        String msg="你好,路由模式交换机";
        byte[] bytes = msg.getBytes();
        channel.basicPublish(EX_NAME,"中国.北京.大兴.亦庄",null,bytes);
    }
}

SpringBoot整合rabbitmq

Fanout
使用springboot完成rabbitmq的消费-Fanout

整合业务逻辑图
在这里插入图片描述
实现步骤

  1. 创建Spring Initinlizr项目 —producer

    勾选web+rabbitmq组件,编写yml配置

    编写下单业务逻辑接口

@Component
public class OrderService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private String exchangeName = "fanout_order_exchange";
    private String routeKey = "";
    public void makeOrder(Long userId, Long productId, int num) {
    
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }
}

创建配置类完成队列和交换机,并完成绑定

@Configuration
public class DirectRabbitConfig {
    
    @Bean
    public Queue emailQueue() {
    
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue smsQueue() {
    
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
    
        return new Queue("weixin.fanout.queue", true);
    }
    @Bean
    public DirectExchange fanoutOrderExchange() {
    
        return new DirectExchange("fanout_order_exchange", true, false);
    }
    @Bean
    public Binding bindingDirect1() {
    
        return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect2() {
    
        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect3() {
    
        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
    }
}

编写一个发送消息的测试类

@SpringBootTest
class SpringBootOrderRabbitmqProducerApplicationTests {
    
    @Autowired
    OrderService orderService;
    @Test
    void contextLoads() throws InterruptedException {
    
        for (int i = 0; i < 10; i++) {
    
            Thread.sleep(1000);
            Long userId = 100L + i;
            Long productId = 10001L + i;
            int num = 10;
            orderService.makeOrder(userId, productId, num);
        }
    }
}

创建Spring Initinlizr项目 —consumer

编写一个监听消息队列的接口,监视指定队列,并消费消息

@RabbitListener(queues = "weixin.fanout.queue")
@Component
public class EmailController {
    
    @RabbitHandler
    public void  messagerevice(String msg){
    
        System.out.println("邮件发送消息:"+msg);
    }
}
@RabbitListener(queues = "sms.fanout.queue")
@Component
public class SMSController {
    
    @RabbitHandler
    public void  smsrevice(String msg){
    
        System.out.println("sms发送消息:"+msg);
    }
}
@RabbitListener(queues = "weixin.fanout.queue")
@Component
public class WechatController {
    
    @RabbitHandler
    public void  messagerevice(String msg){
    
        System.out.println("微信发送消息:"+msg);
    }
}

Direct
direct和fanout模式的区别

定义交换机的名字不同

绑定关系时添加了路由key

pull消息到queue时,指定了路由key
实现逻辑:

  1. 创建Spring Initinlizr项目 —comsumer

    勾选web+rabbitmq组件,编写yml配置

    编写下单业务逻辑接口

public void makeOrderDirect(String userId, String productId, int num) {
    
        private String routeKey1 = "sms";
        private String routeKey2 = "email";
        private String DirectExchangeName = "direct_order_exchange";
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(DirectExchangeName, routeKey1, orderNumer);
        rabbitTemplate.convertAndSend(DirectExchangeName, routeKey2, orderNumer);
    }

创建配置类完成队列和交换机,并完成绑定

@Configuration
public class DirectRabbitConfig {
    
    @Bean
    public Queue emailQueueDirect() {
    
        return new Queue("email.direct.queue", true);
    }
    @Bean
    public Queue smsQueueDirect() {
    
        return new Queue("sms.direct.queue", true);
    }
    @Bean
    public Queue weixinQueueDirect() {
    
        return new Queue("weixin.direct.queue", true);
    }
    @Bean
    //区别1
    public DirectExchange directOrderExchange() {
    
        return new DirectExchange("direct_order_exchange", true, false);
    }
    @Bean
    public Binding bindingDirect1Direct() {
    
        return BindingBuilder.bind(weixinQueueDirect()).to(directOrderExchange()).with("weixin");//区别2
    }
    @Bean
    public Binding bindingDirect2Direct() {
    
        return BindingBuilder.bind(smsQueueDirect()).to(directOrderExchange()).with("sms");
    }
    @Bean
    public Binding bindingDirect3Direct() {
    
        return BindingBuilder.bind(emailQueueDirect()).to(directOrderExchange()).with("email");
    }
}

编写一个发送消息的测试类

    @Test
    void contextLoads1() throws InterruptedException {
    
        orderService.makeOrderDirect("1","1",12);
    }
    @Test
    void contextLoads2() throws InterruptedException {
    
        orderService.makeOrderDirect("1","1",12);
    }

创建Spring Initinlizr项目 —consumer

编写一个监听消息队列的接口,监视指定队列,并消费消息

@RabbitListener(queues = "weixin.direct.queue")
@Component
public class EmailControllerDirect {
    
    @RabbitHandler
    public void  emailDirectRevice(String msg){
    
        System.out.println("direct->>>>>>>>>邮件发送消息:"+msg);
    }
}
@RabbitListener(queues = "sms.direct.queue")
@Component
public class SMSControllerDirect {
    
    @RabbitHandler
    public void  smsDirectRevice(String msg){
    
        System.out.println("direct->>>>>>>>>sms发送消息:"+msg);
    }
}
@RabbitListener(queues = "weixin.direct.queue")
@Component
public class WechatControllerDirect {
    
    @RabbitHandler
    public void  emailDirectRevice(String msg){
    
        System.out.println("direct->>>>>>>>>微信发送消息:"+msg);
    }
}

Topic
topic和direct区别

发送消息根据模糊路由匹配

没有定义配置类,绑定逻辑通过注解编写在消费端
实现逻辑

  1. 创建Spring Initinlizr项目 —comsumer

    勾选web+rabbitmq组件,编写yml配置

    编写下单业务逻辑接口

public void makeOrderTopic(String userId, String productId, int num) {
    
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        /**
         *  *.email.#
         *  #.sms.#
         *  com.#
         */
        String topicExchangeName = "topic_order_exchange";
        String routeKey = "com"; //输出:topic ->>>>>>>>>微信发送消息:7aefec2c-60da-404c-ba71-cce63839c74f
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(topicExchangeName, routeKey, orderNumer);
    }

编写一个发送消息的测试类

 @Test
    void contextLoads2Topic() throws InterruptedException {
    
        orderService.makeOrderTopic("1","1",12);
    }

创建Spring Initinlizr项目 —consumer

编写一个监听消息队列的接口,监视指定队列,并消费消息

@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "email.topic.queue",autoDelete = "false",durable = "true"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "topic_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.TOPIC),key = "*.email.#"
))
@Component
public class EmailControllerTopic {
    
    @RabbitHandler
    public void  emailTopicRevice(String msg){
    
        System.out.println("topic->>>>>>>>>邮件发送消息:"+msg);
    }
}
@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "sms.topic.queue",autoDelete = "false",durable = "true"),
        exchange = @Exchange(value = "topic_order_exchange",
                type = ExchangeTypes.TOPIC),key = "#.sms.#"
))
@Component
public class SMSControllerTopic {
    
    @RabbitHandler
    public void  smsTopicRevice(String msg){
    
        System.out.println("topic->>>>>>>>>sms发送消息:"+msg);
    }
}
@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "weixin.topic.queue",autoDelete = "false",durable = "true"),
        exchange = @Exchange(value = "topic_order_exchange",
                type = ExchangeTypes.TOPIC),key = "com.#"
))
@Component
public class WechatControllerTopic {
    
    @RabbitHandler
    public void  emailTopicRevice(String msg){
    
        System.out.println("topic ->>>>>>>>>微信发送消息:"+msg);
    }
}

ttl过期时间

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。

第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

实现设置队列过期时间
配置类

@Configuration
public class ttlRabbitmqConfig {
    
    @Bean
    public Queue queue1(){
    
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        return new Queue("ttl.queue",true,false,false,args);
    }
    @Bean
    public DirectExchange ttlExchange() {
    
        return new DirectExchange("ttl_order_exchange", true, false);
    }
    @Bean
    public Binding bindingExchange() {
    
        return BindingBuilder.bind(queue1()).to(ttlExchange()).with("ttl");
    }
}

业务层

public void makeOrderTtl(String userId, String productId, int num) {
    
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("user:"+orderNumer);
        String routeKey = "ttl";
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(ttlExchangeName, routeKey, orderNumer);
    }

测试类

   @Test
    void contextLoads2Ttl() throws InterruptedException {
    
        orderService.makeOrderTtl("1","1",12);
    }

消费者监视类

@RabbitListener(queues = "ttl.queue")
@Component
public class ttlController {
    
    @RabbitHandler
    public void  ttlRevice(String msg){
    
        System.out.println("ttl -->>>>>>>邮件发送消息:"+msg);
    }
}

实现设置消息过期机制
producer代码

    @Bean
    public Queue messageQueue() {
    
        return new Queue("message.queue",true);
    }
    @Bean
    public DirectExchange messageOrderExchange() {
    
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("message_order_exchange", true, false);
    }
     @Bean
    public Binding bindingMessage() {
    
        return BindingBuilder.bind(messageQueue()).to(messageOrderExchange()).with("message");
    }

producer发送消息代码

public void ttlOrder(String userId, String productId, int num) {
    
        String exchangeName = "message_order_exchange";
        String routeKey = "message";
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
    
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("utf-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName, routeKey,userId, messagePostProcessor);
    }

consumer消费者监听方法

@RabbitListener(queues = "message.queue")
@Component
public class messageController {
    
    @RabbitHandler
    public void messageRevice(String msg) {
    
        System.out.println("message->>>>消费消息");
    }
}

生产者测试类发送消息

@Test
    void contextLoads1() {
    
        orderService.ttlOrder("1"," 1",12);
    }

死信队列案例
概念

当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。

消息被拒绝(basic.reject / basic.nack),并且requeue = false
消息TTL过期
队列达到最大长度
当消息在一个队列中变成一个死信之后,如果配置了死信队列,它将被重新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。
在这里插入图片描述
生产者配置类

@Bean
    public Queue queue1(){
    
        //做了参数的变更和消费不会失败,会报错
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange","dead_direct_exchange");
        args.put("x-dead-letter-routing-key","dead");
        return new Queue("ttl.queue",true,false,false,args);
    }
    @Bean
    public DirectExchange ttlExchange() {
    
        return new DirectExchange("ttl_order_exchange", true, false);
    }
    @Bean
    public Binding bindingExchange() {
    
        return BindingBuilder.bind(queue1()).to(ttlExchange()).with("ttl");
    }

生产者发送消息业务

 //队列过期
    public void makeOrderTtl(String userId, String productId, int num) {
    
        String ttlExchangeName = "ttl_order_exchange";
        String routeKey = "ttl";
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("user:"+orderNumer);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(ttlExchangeName, routeKey, orderNumer);
    }

生产者发送消息测试类

 @Autowired
    private TtlService ttlService;
    @Test
    void contextLoads2Ttl() throws InterruptedException {
    
        ttlService.makeOrderTtl("1","1",12);
    }

消费者

@RabbitListener(queues = "ttl.queue")
public class TtlController {
    
    @RabbitHandler
    public void  ttlRevice(String msg){
    
        System.out.println("ttl -->>>>>>>邮件发送消息:"+msg);
    }
}

Rabbitmq分布式事务

美团业务架构图
在这里插入图片描述
系统间调用过程中事务回滚问题
在这里插入图片描述
订单服务

系统结构

order-service

   entity:    OrderDataBaseService
   pojo:    Order
   mapper:    OrderMapper
   service:    OrderService
   test:    OrderServiceApplicationTests

sql脚本

CREATE TABLE `order_service` (
  `order_id` int(50) DEFAULT NULL,
  `user_id` int(50) DEFAULT NULL,
  `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  `create_time` varchar(50) COLLATE utf8_bin DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

pom.xml

srpingboot-web+rabbitmq+mybatis+jdbc+mysql+org.apache.common+com.fasterxml.jackson.dataformat

application.properties

server.port=8082
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/cn_tedu_order?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
spring.datasource.username=root
spring.datasource.password=root
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
mybatis.mapper-locations=classpath:mapper/*.xml
mybatis.type-aliases-package=cn.tedu.orderservice.pojo
# Mybatis显示sql语句输出的配置
logging.level.cn.tedu.mybatis.mapper=TRACE

order-service

entity:    OrderDataBaseService
@Component
public class OrderDataBaseService {
    
    @Autowired
    private OrderMapper orderMapper;
    public int saveOrder(Order orderInfo){
    
        int i = orderMapper.saveOrder(orderInfo);
        return i;
    }
}
mapper:    OrderMapper
@Repository
public interface OrderMapper {
    
    @Insert("insert into order_service values(#{orderId},#{userId},#{orderContent},#{createTime})")
    int saveOrder(Order order);
}
 pojo:    Order
@Lombok
public class Order {
    
    private int orderId;
    private int userId;
    private String orderContent;
    private String createTime;
}
service:    OrderService
@Service
public class OrderService {
    
    @Autowired
    private OrderDataBaseService orderDataBaseService;
    // 创建订单
    @Transactional(rollbackFor = Exception.class) // 订单创建整个方法添加事务
    public void createOrder(Order orderInfo) throws Exception {
    
        // 1: 订单信息--插入丁订单系统,订单数据库事务
        int i = orderDataBaseService.saveOrder(orderInfo);
        // 2:通過Http接口发送订单信息到运单系统
        int id = orderInfo.getOrderId();
        System.out.println("id:"+id);
        String result = dispatchHttpApi(id);
        if(!"User added successfully".equals(result)) {
    
            throw new Exception("订单创建失败,原因是运单接口调用失败!");
        }
    }
    /**
     *  模拟http请求接口发送,运单系统,将订单号传过去 springcloud
     * @return
     */
    private String dispatchHttpApi(int orderId) {
    
        /**
         * 情况1: 关闭远程服务:ConnectException: Connection refused: connect
         *
         */
        SimpleClientHttpRequestFactory factory  = new SimpleClientHttpRequestFactory();
        // 链接超时 > 3秒
        factory.setConnectTimeout(3000);
        // 处理超时 > 2秒
        factory.setReadTimeout(2000);
        // 发送http请求
        String url = "http://localhost:8081/dispatcher/order?orderId="+orderId;
        RestTemplate restTemplate = new RestTemplate(factory);//异常
        String result = restTemplate.getForObject(url, String.class);
        return result;
    }
}
test:    OrderServiceApplicationTests
@SpringBootTest
class OrderServiceApplicationTests {
    
    @Autowired
    private OrderService orderService;
    @Test
    void actionTest1() throws Exception {
    
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String dateNow = sdf.format(new Date());
        orderService.createOrder(new Order(0,0,UUID.randomUUID().toString(),dateNow));
    }
}

配送中心

系统结构

dispacher-service

   mapper:    DispacherDao
   pojo:    Dispacher
   service:    DispacherService
   web:    DispacherController

sql文件

CREATE TABLE `dispacher_service` (
  `dispacher_id` int(50) DEFAULT NULL,
  `order_id` int(50) DEFAULT NULL,
  `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  `create_time` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  `user_id` int(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8
_bin;
 mapper:    DispacherDao
@Repository
public interface DispatcherDao {
    
//    @Insert("insert into dispacher_service values(#{dispacherId},#{orderId},#{orderContent},#{createTime},#{userId})")
    int insertUser(Dispacher dispacher);
}
pojo:    Dispacher
@Lombok
public class Dispacher  {
    
    private int dispacherId;
    private int orderId;
    private String orderContent;
    private String createTime;
    private int userId;
}
  service:    DispacherService
@Service
//@Transactional(rollbackFor = Exception.class)
public class DispatcherService {
    
    @Autowired
    private DispatcherDao dispatcherDao;
    public boolean dispatcher(int orderId) {
    
        Dispacher dispacher = new Dispacher();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String dateNow = sdf.format(new Date());
        dispacher.setCreateTime(dateNow);
        dispacher.setOrderId(orderId);
        dispacher.setDispacherId(orderId);
        dispacher.setOrderContent(UUID.randomUUID().toString());
        dispacher.setUserId(orderId);
        if(dispacher==null){
    
            return false;
        }else {
    
            int i = dispatcherDao.insertUser(dispacher);
            System.out.println("影响行数:"+i);
        }
        return true;
    }
}
web:    DispacherController
@RestController
@RequestMapping("/dispatcher")
public class DispatcherController {
    
    @Autowired
    DispatcherService dispatcherService;
    @Transactional(rollbackFor = Exception.class) // 订单创建整个方法添加事务
    @GetMapping("/order")
    public String lock(int orderId) throws InterruptedException {
    
        boolean dispatcher = dispatcherService.dispatcher(orderId);
        System.out.println("result:"+dispatcher);
        if(dispatcher){
    
            return "User added successfully";
        }
        return "Failed to add user";
    }
}

基于MQ的分布式事务消息的可靠生产问题

整体设计思路
在这里插入图片描述
存在的问题:

过程: 用户下单->保存到数据库->派发订单信息->保存订单信息

描述: 上面使用事务回滚导致的信息数据两个模块信息不能保持一致,即一个成功一个失败,带给用户的体验非常的差劲

解决方案:

使用mq解决

过程:用户下单->保存到数据库->新增派单信息到数据库->发送下单数据到mq保存->新增发送数据到mq的冗余 表,状态记录,->根据mq中的ack获取是否发送到mq,来决定冗余表里面的状态
在这里插入图片描述
以下代码基于以上回滚问题结构

sql脚本

CREATE TABLE `order_service_message` (
  `order_id` int(50) DEFAULT NULL,
  `status` int(50) DEFAULT NULL,
  `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  `unique_id` int(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

pom.xml

 springboot+rabbitmq+mybaits+mysql+fastjson+test

properties

server.port=8082
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/cn_tedu_order?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
spring.datasource.username=root
spring.datasource.password=root
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirm-type=correlated
mybatis.mapper-locations=classpath:mapper/*.xml
mybatis.type-aliases-package=cn.tedu.orderservice.pojo
# Mybatis显示sql语句输出的配置
logging.level.cn.tedu.mybatis.mapper=TRACE

entity: OrderDataBaeService

@Service
public class OrderDataBaseService {
    
    @Autowired
    private OrderMapper orderMapper;
    public int saveOrder(Order orderInfo) throws Exception {
    
        int i1 = orderMapper.saveOrder(orderInfo);
        orderMapper.saveOrderMessage(
                new MessageOrder(orderInfo.getOrderId(),0,orderInfo.getOrderContent(),orderInfo.getUserId())
        );
        if(i1!=1){
    
            throw new Exception("Filed DatabasesError Action");
        }
        return i1;
    }
    public int updateStatus(int orderId) throws Exception {
    
        int i3 = orderMapper.updateStatus(orderId);
        if(i3!=1){
    
            throw new Exception("The Search Data Is Empty");
        }
        return i3;
    }
}

mapper: OrderMappe

@Repository
public interface OrderMapper {
    
    @Insert("insert into order_service values(#{orderId},#{userId},#{orderContent},#{createTime})")
    int saveOrder(Order order);
    @Insert("insert into order_service_message values(#{orderId},#{status},#{orderContent},#{uniqueId})")
    int saveOrderMessage(MessageOrder order);
    @Update("update order_service_message set status = 1 where order_id=#{orderId}")
    int updateStatus(int orderId);
}

pojo: MessageOrder

@Lombok
public class MessageOrder implements Serializable {
    
    private int orderId;
    private int status;
    private String orderContent;
    private int uniqueId;
    }
Order
@Lombok
public class Order implements Serializable {
    
    private int orderId;
    private int userId;
    private String orderContent;
    private String createTime;
    }

service: OrderService

@Service
public class OrderService {
    
    @Autowired
    private OrderDataBaseService orderDataBaseService;
    public void createOrder(Order orderInfo) throws Exception {
    
        // 1: 订单信息--插入丁订单系统,订单数据库事务
        int i = orderDataBaseService.saveOrder(orderInfo);
        if(i!=1){
    
            throw new Exception("添加用户失败");
        }
        // 2:通過Http接口发送订单信息到运单系统
        int id = orderInfo.getOrderId();
        System.out.println("id:"+id);
        String result = dispatchHttpApi(id);
        if(!"User added successfully".equals(result)) {
    
            throw new Exception("订单创建失败,原因是运单接口调用失败!");
        }
    }
    /**
     *  模拟http请求接口发送,运单系统,将订单号传过去 springcloud
     * @return
     */
    private String dispatchHttpApi(int orderId) {
    
        /**
         * 情况1: 关闭远程服务:ConnectException: Connection refused: connect
         *
         */
        SimpleClientHttpRequestFactory factory  = new SimpleClientHttpRequestFactory();
        // 链接超时 > 3秒
        factory.setConnectTimeout(3000);
        // 处理超时 > 2秒
        factory.setReadTimeout(2000);
        // 发送http请求
        String url = "http://localhost:8081/dispatcher/order?orderId="+orderId;
        RestTemplate restTemplate = new RestTemplate(factory);//异常
        String result = restTemplate.getForObject(url, String.class);
        return result;
    }
}
 TestOrderServcie
@Service
public class TestOrderService {
    
    @Autowired
    OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private OrderDataBaseService orderDataBaseService;
    //构造函数执行完成之后执行的方法,init
    @PostConstruct
    public void regCallback(){
    
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    
                System.out.println("cause:"+cause);
                String id = correlationData.getId();
                int orderId = Integer.parseInt(id);
                //如果ack为true表示消息已收到
                if(!ack){
    
                    System.out.println("下单信息投递到mq失败");
                    return;
                }
                try {
    
                    // 1: 订单信息--插入丁订单系统,订单数据库事务
                    int i = orderDataBaseService.updateStatus(orderId);
                    if(i==1){
    
                        System.out.println("修改状态成功,成功投递到mq");
                    }
                }catch (Exception e){
    
                    System.out.println("本地消息状态修改异常");
                }
            }
        });
    }
    //
    public void sendMessage(Order orderInfo){
    
        String userJson = JSON.toJSONString(orderInfo);
        int orderId = orderInfo.getOrderId();
        String s = String.valueOf(orderId);
        rabbitTemplate.convertAndSend("save-order-exchange","",userJson,new CorrelationData(s));
    }
}

基于MQ的分布式事务消息的可靠消费
可靠消费会出现的问题:

消费失败,触发mq循环重试的机制

新增消费者的代码

@Component
public class OrderConsumer {
    
    private DispatcherService dispatcherService;
    private int count = 1;
    @RabbitListener(queues = "save-order-queue")
    public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
            System.out.println("收到mq的消息是:"+orderMsg+",count = "+count++);
            ;
            Order order = JSONObject.parseObject(orderMsg,Order.class);
            int orderId = order.getOrderId();
            boolean dispatcher = dispatcherService.dispatcher(orderId);
            if(dispatcher){
    
                System.out.println("消费者:ok");
            }else {
    
                System.out.println("消费者error");
            }
    }
}

解决方案:

1、控制消费常识获取次数
在这里插入图片描述
新增代码

#手动ack开启
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts= 3
#间隔时间
spring.rabbitmq.listener.simple.retry.initial-interval=2000ms

测试…

2、手动获取ack,加上try/catch
在这里插入图片描述
变动的代码

@Component
public class OrderConsumer {
    
    private DispatcherService dispatcherService;
    private int count = 1;
    @RabbitListener(queues = "save-order-queue")
    public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
        try {
    
            System.out.println("收到mq的消息是:"+orderMsg+",count = "+count++);
            ;
            Order order = JSONObject.parseObject(orderMsg,Order.class);
            int orderId = order.getOrderId();
            boolean dispatcher = dispatcherService.dispatcher(orderId);
            if(dispatcher){
    
                System.out.println("消费者:ok");
            }else {
    
                System.out.println("消费者error");
            }
            channel.basicAck(tag,false);
        } catch (Exception e) {
    
            /**
             * tag      消息的tag
             * false    不会重发,会把消息打入到死信队列
             * requeue  true会死循环的重发,建议如果使用true的话,不要try/catch,否则照成死循环
             */
            channel.basicNack(tag,false,false);
        }
    }
}

基于重试的配置文件,添加以上代码测试…

3、try/catch+手动获取ack+死信队列
在这里插入图片描述
根据以上两种情况配置,添加如下配置完成分布式事务数据一致问题解决

//死信队列管理
@Configuration
public class OrderProducer {
    
    @Bean
    public Queue deadQueue(){
    
        return new Queue("dead-queue",true);
    }
    @Bean
    public DirectExchange deadExchange() {
    
        return new DirectExchange("dead-exchange", true, false);
    }
    @Bean
    public Binding deadBindingExchange() {
    
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("");
    }
    @Bean
    public Queue orderQueue(){
    
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange","dead-exchange");
        return new Queue("save-order-queue",true,false,false,args);
    }
    @Bean
    public DirectExchange orderExchange() {
    
        return new DirectExchange("save-order-exchange", true, false);
    }
    @Bean
    public Binding orderBingExchange() {
    
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("");
    }
}

以下可以不用添加测试

@Component
public class DeadConsumer {
    
    private DispatcherService dispatcherService;
    private int count = 1;
    @RabbitListener(queues = "save-order-queue")
    public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
            System.out.println("收到mq的消息是:"+orderMsg+",count = "+count++);
            Order order = JSONObject.parseObject(orderMsg,Order.class);
            //编写将mq对象保存到数据库进行人工检查数据完整性~~~
    }
}

秒杀实战

实现逻辑:

 1. 生产端发送消息,传递,谁秒杀了什么
 2. 获取消息,将消息中split出来的数据作为减去库存的条件
 3. 封装数据写入到秒杀成功的数据库表中

前置条件

环境搭建

​ springboot+mybaits+redis+rabbitmq+mysql+springbootest

properties

server.port=10007
#datasource
spring.datasource.url=jdbc:mysql:///seckill
spring.datasource.password=root
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
#mybatis
mybatis.configuration.map-underscore-to-camel-case=true
mybatis.type-aliases-package=com.jt.common.pojo
mybatis.mapper-locations=classpath:/mappers/*.xml
#微服务配置
spring.application.name=seckill-service
eureka.client.service-url.defaultZone=http://localhost:8761/eureka/
#rabbitmq
spring.rabbitmq.host=10.9.182.139
#redis
spring.redis.cluster.nodes=10.9.182.139:8000

sql脚本

/*
SQLyog Ultimate - MySQL GUI v8.2 
MySQL - 5.5.27 : Database - seckill
*********************************************************************
*/
/*!40101 SET NAMES utf8 */;
/*!40101 SET SQL_MODE=''*/;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`seckill` /*!40100 DEFAULT CHARACTER SET utf8 */;
USE `seckill`;
/*Table structure for table `seckill` */
DROP TABLE IF EXISTS `seckill`;
CREATE TABLE `seckill` (
  `seckill_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '商品库存id',
  `name` varchar(120) NOT NULL COMMENT '商品名称',
  `number` int(11) NOT NULL COMMENT '库存数量',
  `initial_price` bigint(20) NOT NULL COMMENT '原价',
  `seckill_price` bigint(20) NOT NULL COMMENT '秒杀价',
  `sell_point` varchar(500) NOT NULL COMMENT '卖点',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '秒杀创建时间',
  `start_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '秒杀开始时间',
  `end_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '秒杀结束时间',
  PRIMARY KEY (`seckill_id`),
  KEY `idx_create_time` (`create_time`),
  KEY `idx_start_time` (`start_time`),
  KEY `idx_end_time` (`end_time`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8 COMMENT='秒杀库存表';
/*Data for the table `seckill` */
insert  into `seckill`(`seckill_id`,`name`,`number`,`initial_price`,`seckill_price`,`sell_point`,`create_time`,`start_time`,`end_time`) values (1,'oppo10',719,2000,1000,'1000元成功秒杀oppo10','2018-05-17 11:12:49','2019-05-09 13:13:49','2020-05-18 00:00:00'),(2,'荣耀8',80,1800,800,'800元成功秒杀荣耀8','2018-01-21 22:08:49','2018-01-23 00:00:00','2018-01-24 00:00:00'),(3,'iPhone6',60,1600,600,'600元成功秒杀iPhone6','2018-01-21 22:08:49','2018-01-24 00:00:00','2018-01-25 00:00:00'),(4,'小米4',40,1400,400,'400元成功秒杀小米4','2018-01-21 22:08:49','2018-01-25 00:00:00','2018-01-26 00:00:00'),(5,'vivo2',20,1200,200,'200元成功秒杀vivo2','2018-01-21 22:08:49','2018-01-26 00:00:00','2018-01-27 00:00:00'),(6,'魅族1',10,1000,100,'100元成功秒杀魅族1','2018-01-21 22:08:49','2018-01-27 00:00:00','2018-01-28 00:00:00');
/*Table structure for table `success` */
DROP TABLE IF EXISTS `success`;
CREATE TABLE `success` (
  `success_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '秒杀成功id',
  `seckill_id` bigint(20) NOT NULL COMMENT '秒杀商品id',
  `user_phone` bigint(20) NOT NULL COMMENT '用户手机号',
  `state` tinyint(4) NOT NULL DEFAULT '-1' COMMENT '状态标志:-1:无效;0:成功;1:已付款;2:已发货',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '秒杀成功创建时间',
  PRIMARY KEY (`success_id`),
  KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=6382 DEFAULT CHARSET=utf8 COMMENT='秒杀成功明细表';
/*Data for the table `success` */
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

前端页面

<!DOCTYPE html>
<html>
<!-- jQuery文件。务必在bootstrap.min.js 之前引入 -->
<script src="/js/jquery.min.js"></script>
<!-- 最新的 Bootstrap 核心 JavaScript 文件 -->
<script src="/js/bootstrap.min.js"></script>
<meta charset="UTF-8">
<head>
    <title>商品详情页</title>
    <style>
        .disappearBtn{
      
            display:none;
        }
        .showBtn{
      
            display:block;
        }
    </style>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <!-- 引入 Bootstrap -->
    <link
            href="/css/seckill.css"
            rel="stylesheet">
    <script type="text/javascript">
        //重复执行success方法;生产环境下不建议使用,很耗费性能
        var seckillId="";
        $(function() {
      
            var localUrl= window.location.href;
            var args=localUrl.substr(localUrl.lastIndexOf("?")+1);
            if(!args.match("^seckillId=[0-9]*$")){
      
                alert("你的参数有问题,按默认查询seckill=1")
                args="seckillId=1";
            }
            alert(args);
            queryDetail(args);
            /*$("#seckill-btn").click(function() {
                $.ajax({
                    url: '/seckill/${seckill.seckillId}',
                    type: 'GET',
                    dataType: 'text',
                    success: function(result) {
                        alert(result);
                        console.log(result)
                    }
                });
            });*/
        });
        function startSeckill(){
      
            $.ajax({
      
                url:"http://www.easymall.com/seckills/"+seckillId,
                dataType:"json",
                type:"GET",
                success:function(data){
      
                    if(data.status==200){
      
                        alert("秒杀成功");
                    }else if(data.status=202){
      
                        alert("兄弟你没有登录吧,那我哪知道你是谁?");
                        window.location.href="./login.html";
                    }else{
      
                        alert("秒杀失败,原因未知,算你倒霉");
                        windwo.location.href="./seckill-list.html";
                    }
            },
                error:function(){
      
                    alert("发送失败");
                }
            });
        }
        function queryDetail(args){
      
            $.ajax({
      
                url:"http://www.easymall.com/seckills/detail?"+args,
                dataType:"json",
                type:"GET",
                success:function (data) {
      
                    if(data!=null){
      
                        $("#seckill_content").append("<h1>"+data.name+"<small>(秒杀价"+data.seckillPrice+"元)</small></h1>");
                        var nowTime=new Date().getTime();
                       /* alert(data.startTime);
                        alert(data.endTime);
                        alert(nowTime);*/
                        seckillId=data.seckillId;
                        countdown(nowTime,data.startTime,data.endTime);
                    }else{
      
                        alert("数据有问题");
                    }
                },
                error:function () {
      
                    alert("发送失败");
                }
            })
        }
        function countdown(nowTime,startTime,endTime){
      
            /*alert("开始计算");*/
            var countdownBtn = $('#countdown-btn');
            if(nowTime>endTime){
      
                countdownBtn.html('秒杀结束');
            }else if(nowTime<startTime){
      
                var killTime = new Date(startTime);
                countdownBtn.countdown(killTime,function(event){
      
                    var format = event.strftime('秒杀倒计时:%D天 %H时 %M分 %S秒');
                    countdownBtn.html(format);
                }).on('finish.countdown',function(){
      
                    //倒计时结束后回调事件
                    $('#countdown-div').addClass('disappearBtn');
                    $('#seckill-div').addClass('showBtn');
                })
            }else{
      
                //执行秒杀
                $('#countdown-div').addClass('disappearBtn');
                $('#seckill-div').addClass('showBtn');
            }
        }
        function success(sekillId){
      
            $.ajax({
      
                url: "http://www.easymall.com/seckills/"+seckillId+"/userPhone",
                type: 'GET',
                dataType: 'json',
                success: function(result) {
      
                    //console.log(result);
                    var i=0;
                    var str ="";
                    for(i=0;i<result.length;i++){
      
                        str += "非常感谢您参与本次秒杀活动,恭喜手机号为"+result[i]+"的幸运用户${seckill.sellPoint}<br/>";
                    }
                    document.getElementById("showSuccess").innerHTML=str;
                }
            });
        }
        window.setInterval("success(seckillId)",1000);
    </script>
</head>
<body>
<div class="container">
    <div class="panel panel-default text-center">
        <div id="seckill_content" class="pannel-heading">
        </div>
        <div class="panel-body"  id="countdown-div">
            <button type="button" class="btn  btn-lg btn-block btn-danger" id="countdown-btn"></button>
        </div>
        <div class="panel-body disappearBtn"  id="seckill-div">
            <button type="button" class="btn btn-primary btn-lg btn-block btn-info" id="seckill-btn" onclick="startSeckill()">开始进入商品秒杀环节</button>
        </div>
        <div id="showSuccess"></div>
    </div>
</div>
</body>
<!-- jQuery countDown倒计时插件 -->
<script src="/js/jquery.countdown.js"></script>
<!-- 计时交互逻辑 -->
<script>
</script>
</html>

js效果展示

function success(sekillId){
    
    $.ajax({
    
        url: "http://www.easymall.com/seckills/"+seckillId+"/userPhone",
        type: 'GET',
        dataType: 'json',
        success: function(result) {
    
            //console.log(result);
            var i=0;
            var str ="";
            for(i=0;i<result.length;i++){
    
                str += "非常感谢您参与本次秒杀活动,恭喜手机号为"+result[i]+"的幸运用户${seckill.sellPoint}<br/>";
            }
            document.getElementById("showSuccess").innerHTML=str;
        }
    });}

接口文档

js请求地址    http://www.easymall/seckills/{seckillId}
后台接收    /seckill/manage/{seckillId}
请求方式    Get
请求参数    Long seckillId 路径传参
返回数据    SysResult的返回对象
Integer status 200表示秒杀成功
String msg:ok表示成功
Object data:其他数据
备注    根据cookie获取用户信息,拼接用户数据到消息中,绑定一个秒杀的商品

后台代码

consumer

@Component
public class SeckillConsumer {
    
    /*
        创建秒杀中,消费端的消费逻辑代码
        1.参数就是消息msg userPhone+"/"+seckillId
        解析字符串 userPhone seckillId
        2.利用seckillId对该商品实现减库存操作,mybatis,insert update
        这种写操作会直接将 1/0 rows affected封装到返回结果
        条件: seckillId >0 时间限制 当前系统时间必须 大于start 小于
        end
        3.判断成功失败,记录日志,记录数据,success对象封装写入数据库
        为后续逻辑做准备
     */
    @Autowired(required = false)
    private SeckillMapper seckillMapper;
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RabbitListener(queues = "seckill_q")
    public void seckill(String msg){
    
        //解析
        Long userPhone=Long.parseLong(msg.split("/")[0]);
        Long seckillId=Long.parseLong(msg.split("/")[1]);
        //执行数据库减库存操作
        Date nowTime=new Date();
        //在消费端真正减库存之前,先到redis执行incr命令
        String seckillKey="seckill_"+seckillId;
        Integer number = NumberUtils.NUMBER.get(seckillKey);
        Long increment = redisTemplate.opsForValue()
                .increment(seckillKey);
        if(increment>number){
    
            //已经从redis减库存完事了
            System.out.println(
                    "该用户:"
                    +userPhone
                    +"秒杀商品:"
                    +seckillId
                    +"失败,卖完了");
            return;
        }
        int result
                =seckillMapper.updateNumber(seckillId,nowTime);
        if(result==0){
    
            System.out.println(
                    "该用户:"
                    +userPhone
                    +"秒杀商品:"
                    +seckillId
                    +"失败,卖完了");
            return;
        }
        //成功减库存 封装数据,写入数据库success表格
        Success suc=new Success();
        suc.setCreateTime(nowTime);
        suc.setSeckillId(seckillId);
        suc.setUserPhone(userPhone);
        suc.setState(0);
        seckillMapper.insertSuccess(suc);
    }
}

controller

@RestController
@RequestMapping("/seckill/manage")
public class SeckillController {
    
    /**
     * 查询所有秒杀商品list列表
     * 请求地址:/list
     * 请求参数:null
     * 返回数据:List<Seckill>
     */
    @Autowired(required = false)
    private SeckillMapper seckillMapper;
    @RequestMapping("/list")
    public List<Seckill> list(){
    
        return seckillMapper.selectSeckills();
    }
    /**
     * 根据list列表中展示的商品,点击详情
     * 请求地址:/detail
     * 请求参数:Long seckillId
     * 返回数据:Seckill
     */
    @RequestMapping("/detail")
    public Seckill detail(Long seckillId){
    
        return seckillMapper.selectSeckillById(seckillId);
    }
    /**
     * 发起秒杀请求
     * 请求地址:/{sekcillId}
     * 请求参数:路径参数
     * 返回数据:sysResult 200
     */
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/{seckillId}")
    public SysResult startSeckill(@PathVariable String seckillId){
    
        //生产端发送消息,传递,谁秒杀了什么
        String userPhone="1330668"+(new Random().nextInt(9000)+1000);
        //模拟每次访问的人不一样,有可能一样
        String msg=userPhone+"/"+seckillId;
        /*限制一人秒杀商品多次
            使用msg 作为redis的key值,判断key值是否在redis存在
            决定是否限制本次请求秒杀向后发送消息
         */
        Long increment = redisTemplate.opsForValue().increment(msg);
        if(increment>1){
    
            //第二次以上相同用户秒杀同一个商品了
            return SysResult.build(201,"占便宜没够",null);
        }
        //将其发送给seckill_q
        template.convertAndSend(
                "seckill_ex",
                "seckill",
                msg);
        return SysResult.ok();
    }
    /*
        展示成功者信息
        请求地址:/{seckillId}/userPhone
        请求参数:路径参数Long seckillId
        返回数据:List<String> 包含了所有的成功者电话
     */
    @RequestMapping("/{seckillId}/userPhone")
    public List<String> successList(@PathVariable Long seckillId){
    
        return seckillMapper.selectUserphonesById(seckillId);
    }
}

mapper

public interface SeckillMapper {
    
    List<Seckill> selectSeckills();
    Seckill selectSeckillById(Long seckillId);
    int updateNumber(@Param("seckillId") Long seckillId,
                     @Param("nowTime")Date nowTime);
    void insertSuccess(Success suc);
    List<String> selectUserphonesById(Long seckillId);
}

mapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.tedu.seckill.mapper.SeckillMapper">
    <!--查询秒杀商品列表-->
    <select id="selectSeckills" resultType="Seckill">
        select * from seckill;
    </select>
    <!--利用id查询单个商品-->
    <select id="selectSeckillById" resultType="Seckill">
        select * from seckill where seckill_id=#{seckillId};
    </select>
    <!--更新库存 减库存-->
    <update id="updateNumber">
        update seckill set number=number-1 where
        seckill_id =#{seckillId} AND
        number &gt; 0 and
        #{nowTime} &gt; start_time and
        #{nowTime} &lt; end_time;
    </update>
    <!--新增入库-->
    <insert id="insertSuccess">
        insert into success (seckill_id,user_phone,create_time,state)
        values (#{seckillId},#{userPhone},#{createTime},#{state});
    </insert>
    <!--展示成功信息-->
    <select id="selectUserphonesById" resultType="String">
        select user_phone from success
        where seckill_id=#{seckillId};
    </select>
</mapper>
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u012386080/article/details/120172355

智能推荐

18个顶级人工智能平台-程序员宅基地

文章浏览阅读1w次,点赞2次,收藏27次。来源:机器人小妹  很多时候企业拥有重复,乏味且困难的工作流程,这些流程往往会减慢生产速度并增加运营成本。为了降低生产成本,企业别无选择,只能自动化某些功能以降低生产成本。  通过数字化..._人工智能平台

electron热加载_electron-reloader-程序员宅基地

文章浏览阅读2.2k次。热加载能够在每次保存修改的代码后自动刷新 electron 应用界面,而不必每次去手动操作重新运行,这极大的提升了开发效率。安装 electron 热加载插件热加载虽然很方便,但是不是每个 electron 项目必须的,所以想要舒服的开发 electron 就只能给 electron 项目单独的安装热加载插件[electron-reloader]:// 在项目的根目录下安装 electron-reloader,国内建议使用 cnpm 代替 npmnpm install electron-relo._electron-reloader

android 11.0 去掉recovery模式UI页面的选项_android recovery 删除 部分菜单-程序员宅基地

文章浏览阅读942次。在11.0 进行定制化开发,会根据需要去掉recovery模式的一些选项 就是在device.cpp去掉一些选项就可以了。_android recovery 删除 部分菜单

mnn linux编译_mnn 编译linux-程序员宅基地

文章浏览阅读3.7k次。https://www.yuque.com/mnn/cn/cvrt_linux_mac基础依赖这些依赖是无关编译选项的基础编译依赖• cmake(3.10 以上)• protobuf (3.0 以上)• 指protobuf库以及protobuf编译器。版本号使用 protoc --version 打印出来。• 在某些Linux发行版上这两个包是分开发布的,需要手动安装• Ubuntu需要分别安装 libprotobuf-dev 以及 protobuf-compiler 两个包•..._mnn 编译linux

利用CSS3制作淡入淡出动画效果_css3入场效果淡入淡出-程序员宅基地

文章浏览阅读1.8k次。CSS3新增动画属性“@-webkit-keyframes”,从字面就可以看出其含义——关键帧,这与Flash中的含义一致。利用CSS3制作动画效果其原理与Flash一样,我们需要定义关键帧处的状态效果,由CSS3来驱动产生动画效果。下面讲解一下如何利用CSS3制作淡入淡出的动画效果。具体实例可参考刚进入本站时的淡入效果。1. 定义动画,名称为fadeIn@-webkit-keyf_css3入场效果淡入淡出

计算机软件又必须包括什么,计算机系统应包括硬件和软件两个子系统,硬件和软件又必须依次分别包括______?...-程序员宅基地

文章浏览阅读2.8k次。计算机系统应包括硬件和软件两个子系统,硬件和软件又必须依次分别包括中央处理器和系统软件。按人的要求接收和存储信息,自动进行数据处理和计算,并输出结果信息的机器系统。计算机是脑力的延伸和扩充,是近代科学的重大成就之一。计算机系统由硬件(子)系统和软件(子)系统组成。前者是借助电、磁、光、机械等原理构成的各种物理部件的有机组合,是系统赖以工作的实体。后者是各种程序和文件,用于指挥全系统按指定的要求进行..._计算机系统包括硬件系统和软件系统 软件又必须包括

随便推点

进程调度(一)——FIFO算法_进程调度fifo算法代码-程序员宅基地

文章浏览阅读7.9k次,点赞3次,收藏22次。一 定义这是最早出现的置换算法。该算法总是淘汰最先进入内存的页面,即选择在内存中驻留时间最久的页面予以淘汰。该算法实现简单,只需把一个进程已调入内存的页面,按先后次序链接成一个队列,并设置一个指针,称为替换指针,使它总是指向最老的页面。但该算法与进程实际运行的规律不相适应,因为在进程中,有些页面经常被访问,比如,含有全局变量、常用函数、例程等的页面,FIFO 算法并不能保证这些页面不被淘汰。这里,我_进程调度fifo算法代码

mysql rownum写法_mysql应用之类似oracle rownum写法-程序员宅基地

文章浏览阅读133次。rownum是oracle才有的写法,rownum在oracle中可以用于取第一条数据,或者批量写数据时限定批量写的数量等mysql取第一条数据写法SELECT * FROM t order by id LIMIT 1;oracle取第一条数据写法SELECT * FROM t where rownum =1 order by id;ok,上面是mysql和oracle取第一条数据的写法对比,不过..._mysql 替换@rownum的写法

eclipse安装教程_ecjelm-程序员宅基地

文章浏览阅读790次,点赞3次,收藏4次。官网下载下载链接:http://www.eclipse.org/downloads/点击Download下载完成后双击运行我选择第2个,看自己需要(我选择企业级应用,如果只是单纯学习java选第一个就行)进入下一步后选择jre和安装路径修改jvm/jre的时候也可以选择本地的(点后面的文件夹进去),但是我们没有11版本的,所以还是用他的吧选择接受安装中安装过程中如果有其他界面弹出就点accept就行..._ecjelm

Linux常用网络命令_ifconfig 删除vlan-程序员宅基地

文章浏览阅读245次。原文链接:https://linux.cn/article-7801-1.htmlifconfigping &lt;IP地址&gt;:发送ICMP echo消息到某个主机traceroute &lt;IP地址&gt;:用于跟踪IP包的路由路由:netstat -r: 打印路由表route add :添加静态路由路径routed:控制动态路由的BSD守护程序。运行RIP路由协议gat..._ifconfig 删除vlan

redux_redux redis-程序员宅基地

文章浏览阅读224次。reduxredux里要求把数据都放在公共的存储区域叫store里面,组件中尽量少放数据,假如绿色的组件要给很多灰色的组件传值,绿色的组件只需要改变store里面对应的数据就行了,接着灰色的组件会自动感知到store里的数据发生了改变,store只要有变化,灰色的组件就会自动从store里重新取数据,这样绿色组件的数据就很方便的传到其它灰色组件里了。redux就是把公用的数据放在公共的区域去存..._redux redis

linux 解压zip大文件(解决乱码问题)_linux 7za解压中文乱码-程序员宅基地

文章浏览阅读2.2k次,点赞3次,收藏6次。unzip版本不支持4G以上的压缩包所以要使用p7zip:Linux一个高压缩率软件wget http://sourceforge.net/projects/p7zip/files/p7zip/9.20.1/p7zip_9.20.1_src_all.tar.bz2tar jxvf p7zip_9.20.1_src_all.tar.bz2cd p7zip_9.20.1make && make install 如果安装失败,看一下报错是不是因为没有下载gcc 和 gcc ++(p7_linux 7za解压中文乱码