# 消息队列

  • 消息传输过程中,用来保存消息的容器;
  • 消息队列,是一种 应用程序对应用程序的通信方法;
  • 队列的使用,出去了接收和发送应员工程序同时执行的要求;

# RabbitMQ

# 发展历程

整理自玩转RabbitMQ之二:RabbitMQ简史 (opens new window)

  • 一种通用的软件总线,可以 供各种应用程序接入,就像计算机通过硬件总线接入连接各种硬件设备一样
  • 思路来源于硬件总线,开发**软件总线(Teknekron)**并应用于金融交易场所,后来诞生消息队列通信软件的鼻祖: The Information Bus (TIB)
  • 1994年左右,微软与IBM分别研发出 IBM MQ系列和 MSMQ系列。
  • 传统MQ各家厂商标准不一致,存在产品壁垒。为解决这个问题,SUN的工程师创造了JMS。事实证明,使用JMS让应用程序变得更加脆弱.
  • 2004年,为了寻找比JMS更好的通信方案,一群大佬开发出了AMQP(Advanced Message Queuing Protocol),2006年初稿发布
  • 2007年,RabbitMQ之父,开发Metalogic时,积累了Erlang和分布式的经验,偶然接触到AMQP,便通过Erlang来开发了AMQP服务器;

# 发布历史

  • 2007.02: 发布1.0版本
  • 2010.08:发布2.0版本
  • 2012.11:发布3.0版本
  • 2021.07:发布3.9版本

# 特点

  • Erlang语言开发
  • 基于AMQP协议实现
  • 适用于对数据一致性,稳定性和可靠性要求很高的场景
  • 性能和吞吐量 的要求在其次

# 使用场景

  • 解耦的场合:关心的是“通知”,而非“处理”, 下单->出库
  • 异步的场合: 异步并行通知,提升效率
  • 削峰的场合:秒杀场景

# 弊端

  • 可用性降低,MQ挂了以后,会导致系统崩溃
  • 系统复杂性提高: 重复消费问题?如何保证可靠传递?如何保证消息传递顺序?
  • 业务一致性问题:下单发送消息后,直接返回成功。若后续服务失败,会产生数据不一致问题

# 工作模式

整理至: RabbitMQ原理讲解——一只特立独行的狗 (opens new window) rabbitMQ工作工作原理

# 基本概念

  1. Channel(信道): 建立在真实的TCP连接内的虚拟连接,双向数据流通道
  2. Producer(生产者): 向消息队列发布消息的客户端程序
  3. Consumer(消费者): 从消息队列中,获取消息的客户端程序
  4. Message(消息): 由消息头消息体组成; 消息体不透明;消息头由一系列可选属性组成,如: routing-key(路由键)、priority(优先级)、delivery-mode(是否持久化存储)等。
  5. Routing Key(路由键): 消息头的一个属性,用于标记消息的路由规则,决定交换机的转发路径。 最大长度255字节
  6. Queue(消息队列): 存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。,它是消息的容器,也是消息的终点。 一条消息只能被一个订阅者接收
  7. Exchange(交换机|路由器): 提供Producer到Queue之间的匹配,接收生产者发送的消息,并将这些消息按照路由规则,转发到消息队列。如果没有Queue绑定到Exchange,则会直接丢弃消息
  8. Binding(绑定): 用于建立 Exchange 和 queue之间的关联。
  9. Binding Key(绑定键): Exchange与Queue的绑定关系,用于匹配Routing Key。 最大长度255字节。
  10. Broker: RabbitMQ Server,服务器实体

# 消息调度策略

  • 消息的调度策略与三个因素有关: Exchange Type(交换机类型)、Binding Key(交换机和队列的绑定关系)、Routing key(消息的标记信息)
  • Exchange 根据消息的 Routing Key和Exchange绑定Queue的Binding Key分配消息。
  • 生产者在发送消息给Exchange时,一般会指定顶一个Routing key,来指定这个消息的路由规则。它需要与 Exchange Type及Binding Key联合使用才能最终生效。
  • 在Exchange Type与 Bind Key固定的情况下(一般这些内容,都是提前固定配置好的),生产者通过指定Routing Key即可决定消息流向哪里。

# 交换机类型

# Fanout交换机(订阅模式|广播模式)

  • 该种模式下,交换机把所有接收到的消息,发送到所有与该交换机绑定的消息队列中。
  • 此种模式,与BindingKeyRoutingKey无关

# Direct(路由模式)

  • 需要消息的RoutingKey与BindingKey完全匹配时,消息才会被转发到消息队列。
  • 该模式时交换机的默认模式。
  • RabbitMQ默认提供了一个交换机,名字时空字符串,类型是Direct,绑定到所有的Queue,每一个Queue与该无名交换机之间的BindingKey是Queue的名字,因此我们可以不指定交换器也可以发送和接收消息,实际上使用的是默认的Exchange

# Topic(通配符模式|主题模式)

  • 用消息的RoutingKey与BindingKey进行模糊匹配,匹配成功,将消息分发到该Queue
  • RoutingKey是一个句点号"."分隔的字符串,被句点号分隔开的的每一段,称为一个单词
  • BindingKey可以存在两种特殊字符,“*”与“#”,用于模糊匹配。 "*"用于匹配一个单词,"#"用于匹配多个单词

# RPC机制

# RPC机制的必要性

  • MQ本身基于异步进行消息处理,生产者对消息的处理情况无感知;
  • 实际应用场景,可能会出现同步处理的场景,即等待消费者将消息消费后,再进行下一步处理。相当于RPC调用。

# 实现机制

  • 生产者发送消息时,在消息属性中,设置两个值(AMQP协议所定义的),: replyTo(queue的名称,消费者处理完成后通知); correlationId(此次请求的标识号,消费者处理完成后,将此属性一并返回,用于解释消息的唯一性);
  • 消费者收到消息并处理
  • 消费者处理完消息后,生成一条应答消息到 replyTo指定的 Queue,同时带上correlationId属性
  • 生产者之前已订阅replyTo指定的Queue,根据correlationId分析哪条消息被处理了,并执行后续业务处理。

# 消息确认机制

  • 实际业务场景中,可能出现消费者收到Queue的消息,但没处理完后就出现宕机,导致消息丢失。
  • 为了避免这种情况,可以让消费者再消费完消息后,发送一个回执给RabbitMQ,RabbitMQ收到回执后,才将该消息删除。
  • 如果在处理过程中,消费者断开了,则会将该消息发送给其它消费者进行处理。不存在Timeout概念,除非消费者断开
  • 如果开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,则可能产生消息堆积

# 消息持久化机制

  • 当我们希望在RabbitMQ服务重启的情况下,也不会丢失消息时,就可以将Queue与Message设置为可持久化的。
  • 此时依然可能出现小概率丢失事件的发生(接收到生产者消息,还没来得及持久化,就宕机的情况)。
  • 此时可以借助事务,将小概率事件管理起来。

# 事务机制

  • 通过txSelect()可以开启一个事务
  • 生产者发送消息给服务器
  • 通过txCommit()提交事务
  • 当txCommit()提交事务成功,则该消息一定会持久化

# 消息分发机制

  • 一般情况下,生产者生产消息时比较快的,消费者由于涉及到一些业务逻辑处理,消费速度慢于生产速度,容易导致消息堆积;
  • 此时使用工作队列,多个消费者同时消费数据;
  • 工作队列有两种分发数据方式: 轮询分发和公平分发
  • 轮询分发,不会因为多个消费者处理数据的速度不一致,而影响分发结果;
  • 公平分发通常与手动应答配合使用,不会出现消息丢失且业务逻辑没能处理的情况;

# 部署

  • 安装Erlang/OTP库并保持版本匹配,OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库
  • 安装RabbitMQ
  • 安装管理插件rabbitmq-plugins.bat enable rabbitmq_management

# 消费模式

# 无序消费:

多线程并发消费

# 有序消费

  • 生产者角度: 对于需要顺序的消息,发到同一个队列中;
  • 消费者角度: 使用MessageListenerOrderly,此时消费者会定期锁住topic的所有队列,保证只有它在消费,本地则使用单线程保证按序消费

# 集群消费

  • 同一个消费组里的消费者,会分摊消息,比如A消费了,B就不能消费;
  • 默认情况下,采用的是这种方式

# 广播消费

  • 同一个消费组里的消费者,每个消费者都能收到消息,AB能消费同一个消息,相当于广播;
  • 应用场景,如配置刷新,需要更新每一个机器实例;
  • **实际用的比较少,且存在很多问题。网上各种回复不一,实践过程中产生的现象记录一下:**消费者需要先于生产者启动;广播模式下,消费者点位不会移动;无法通过dashborad发送广播消息;

# 客户端应用

# 通用

  • 生产者
package com.automannn.demo.mq.rabbit.common;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @author automannn
 * @Date 2022/4/13
 */
@Slf4j
public class SimpleProvider{
    //xxx: 是否持久化
    boolean durable = false;

    //xxx: 队列是否独有(其他信道无法访问)
    boolean exclusive = false;

    //xxx: 是否自动删除队列,队列没有消费者时,删除
    boolean autoDelete = false;

    final private String queueName;

    private Connection connection;

    private Channel channel;

    private String host;
    private int port;
    private String userName;
    private String password;

    public SimpleProvider(String host,int port,String userName,String password,String queueName){
        Assert.isTrue(!StringUtils.isEmpty(queueName),"parameter name cannot be null!");
        this.host = host;
        this.port = port;
        this.userName = userName;
        this.password = password;
        this.queueName = queueName;
        this.init();
    }

    public void init(){
        ConnectionFactory factory  = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(userName);
        factory.setPassword(password);

        try {
            connection  = factory.newConnection();
            channel = connection.createChannel();

            channel.queueDeclare(queueName,durable,exclusive,autoDelete,null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void send(String msg){
        Assert.isTrue(!StringUtils.isEmpty(msg),"parameter msg cannot be null!");
        try {
            //xxx: 参数分别为: 交换机,routingKey,消息属性,消息体
            channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8));
            log.debug("消息投递成功!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void declareFanout(String exchangeName){
        try {
            channel.exchangeDeclare(exchangeName,"fanout");
            //默认要绑定本队列
            channel.queueBind(queueName,exchangeName,"");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void publish(String exchangeName,String msg){
        try {
            channel.basicPublish(exchangeName,"",null,msg.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void publish(String exchangeName,String msg,String routingKey){
        try {
            channel.basicPublish(exchangeName,routingKey,null,msg.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void close(){
        try {
            this.channel.close();
            this.connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendRpc(String msg){
        BlockingQueue<String> responseQueue = new ArrayBlockingQueue<String>(1);
        String corrId  = UUID.randomUUID().toString();

        String replyQueueName = "automannnReplyQueue";
        try {
            channel.queueDeclare(replyQueueName,false,false,false,null);
        } catch (IOException e) {
            e.printStackTrace();
        }

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().replyTo(replyQueueName).correlationId(corrId).build();

        try {
            channel.basicPublish("",queueName,properties,msg.getBytes(StandardCharsets.UTF_8));

            String ctag = channel.basicConsume(replyQueueName,true,(tag,delivery)->{
                if(delivery.getProperties().getCorrelationId().equals(corrId)){
                    responseQueue.offer(new String(delivery.getBody(),StandardCharsets.UTF_8));
                }
            },tag->{});
            String result = responseQueue.take();
            System.out.println(result);

            //取消订阅
            channel.basicCancel(ctag);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
  • 消费者
package com.automannn.demo.mq.rabbit.common;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author automannn
 * @Date 2022/4/13
 */
@Slf4j
public class SimpleConsumer{
    private String host;
    private int port;
    private String username;
    private String password;

    private Connection connection;
    private Channel channel;

    private String queueName;

    private AtomicBoolean consuming=new AtomicBoolean(false);

    //业务回调
    private DeliverCallback deliverCallback;

    public SimpleConsumer(String host,int port,String username,String password,String queueName){
        this.host=host;
        this.port = port;
        this.username = username;
        this.password = password;
        this.queueName = queueName;
        this.init();
    }

    private void  init(){
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(queueName,false,false,false,null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void consume(DeliverCallback callback){
        if (consuming.compareAndSet(false, true)) {
            try {
                channel.basicConsume(queueName,true,callback,tag->{});
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public Channel getChannel() {
        return channel;
    }

    public void declareFanout(String exchangeName){
        try {
            channel.exchangeDeclare(exchangeName,"fanout");
            //默认要绑定本队列
            channel.queueBind(queueName,exchangeName,"");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void declareTopic(String topicExchangeName,String routingKey){
        try {
            channel.exchangeDeclare(topicExchangeName,"topic");
            //默认要绑定本队列
            channel.queueBind(queueName,topicExchangeName,routingKey);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //工作队列方式,手动确认
    public void consumeManualAck(DeliverCallback callback){
        if (consuming.compareAndSet(false, true)) {
            try {
                //xxx: 每次只读取一个消息
                channel.basicQos(1);
                boolean autoAck = false;
                channel.basicConsume(queueName,autoAck,callback,tag->{});
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void close(){
        try {
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

# 匿名交换机

package com.automannn.demo.mq.rabbit.simple;

import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author automannn
 * @Date 2022/4/12
 */
@Slf4j
public class SimpleBrokerTest {
    public static void main(String[] args) {
        String queueName = "automannn.testQueue";

        String host = "127.0.0.1";
        int port = 5672;
        String userName = "guest";
        String password = "guest";

        SimpleConsumer consumer  = new SimpleConsumer(host,port,userName,password,queueName);

        DeliverCallback callback = (tag,deliver)->{
            String message = new String(deliver.getBody(),StandardCharsets.UTF_8);
            log.debug("====消费者消费消息======");
            log.debug(message);
        };
        //xxx: 消费
        consumer.consume(callback);

        SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
        provider.send("hello,automannn!");
    }

}

# 工作队列

package com.automannn.demo.mq.rabbit.workQueues;

import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/13
 */
@Slf4j
public class WorkQueueTest {
    public static void main(String[] args) {
        String queueName = "automannn.testQueue";

        String host = "127.0.0.1";
        int port = 5672;
        String userName = "guest";
        String password = "guest";
        SimpleConsumer consumer1  = new SimpleConsumer(host,port,userName,password,queueName);
        DeliverCallback callback1 = (tag, deliver)->{
            String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("====消费者消费消息======");
            log.debug(message);
            boolean multiple = false;
            consumer1.getChannel().basicAck(deliver.getEnvelope().getDeliveryTag(),multiple);
        };
        consumer1.consumeManualAck(callback1);

        SimpleConsumer consumer2  = new SimpleConsumer(host,port,userName,password,queueName);
        DeliverCallback callback2 = (tag, deliver)->{
            String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("====消费者消费消息======");
            log.debug(message);
            boolean multiple = false;
            consumer2.getChannel().basicAck(deliver.getEnvelope().getDeliveryTag(),multiple);
        };
        consumer2.consumeManualAck(callback2);

        SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
        for (int i=0;i<10;i++){
            provider.send("hello,automannn! sequence is "+i);
        }
    }
}

# 扇形交换机

package com.automannn.demo.mq.rabbit.fanout;

import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/13
 */
@Slf4j
public class FanoutExchangeTest {
    public static void main(String[] args) {
        String queueName = "automannn.testQueue";

        String host = "127.0.0.1";
        int port = 5672;
        String userName = "guest";
        String password = "guest";

        String fanoutExchangeName= "automannnFanoutExchange";

        DeliverCallback callback = (tag, deliver)->{
            String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
            log.debug("====消费者消费消息======");
            log.debug(message);
        };

        SimpleConsumer consumer1  = new SimpleConsumer(host,port,userName,password,queueName+"1");
        SimpleConsumer consumer2  = new SimpleConsumer(host,port,userName,password,queueName+"2");
        SimpleConsumer consumer3  = new SimpleConsumer(host,port,userName,password,queueName+"3");
        SimpleConsumer consumer4  = new SimpleConsumer(host,port,userName,password,queueName+"4");
        consumer1.declareFanout(fanoutExchangeName);
        consumer2.declareFanout(fanoutExchangeName);
        consumer3.declareFanout(fanoutExchangeName);
        consumer4.declareFanout(fanoutExchangeName);
        consumer1.consume(callback);
        consumer2.consume(callback);
        consumer3.consume(callback);
        consumer4.consume(callback);

        SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
        provider.declareFanout(fanoutExchangeName);

        provider.publish(fanoutExchangeName,"hello,automannn...");
    }
}

# 主题交换机

package com.automannn.demo.mq.rabbit.topic;

import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/13
 */
@Slf4j
public class TopicExchangeTest {
    public static void main(String[] args) {
        String queueName = "automannn.testQueue";

        String host = "127.0.0.1";
        int port = 5672;
        String userName = "guest";
        String password = "guest";

        String topicExchangeName= "automannnTopicExchange";

        DeliverCallback callback = (tag, deliver)->{
            String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
            log.debug("====消费者消费消息======");
            log.debug(message);
        };

        String topicGroup1  ="user.*";
        String topicGroup2 = "msg.*";


        SimpleConsumer consumer1  = new SimpleConsumer(host,port,userName,password,queueName+"a");
        SimpleConsumer consumer2  = new SimpleConsumer(host,port,userName,password,queueName+"b");
        SimpleConsumer consumer3  = new SimpleConsumer(host,port,userName,password,queueName+"c");
        SimpleConsumer consumer4  = new SimpleConsumer(host,port,userName,password,queueName+"d");

        consumer1.declareTopic(topicExchangeName,topicGroup1);
        consumer1.declareTopic(topicExchangeName,topicGroup2);

        consumer2.declareTopic(topicExchangeName,topicGroup1);

        consumer3.declareTopic(topicExchangeName,topicGroup2);

        consumer4.declareTopic(topicExchangeName,topicGroup1);
        consumer4.declareTopic(topicExchangeName,topicGroup2);


        consumer1.consume(callback);
        consumer2.consume(callback);
        consumer3.consume(callback);
        consumer4.consume(callback);

        SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
        provider.publish(topicExchangeName,"hello,automannn. userTopic","user.dfdfd");
        provider.publish(topicExchangeName,"hello,automannn. msgTopic","msg.ssd");
    }
}

# rpc调用

package com.automannn.demo.mq.rabbit.rpc;

import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/13
 */
@Slf4j
public class RpcTest {
    public static void main(String[] args) {
        String queueName = "automannn.testQueue";

        String host = "127.0.0.1";
        int port = 5672;
        String userName = "guest";
        String password = "guest";

        SimpleConsumer consumer  = new SimpleConsumer(host,port,userName,password,queueName);

        DeliverCallback callback = (tag, deliver)->{
            String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("====消费者消费消息======");
            log.debug(message);
            boolean multiple = false;
            String responseMsg = "消费成功";
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(deliver.getProperties().getCorrelationId()).build();
            consumer.getChannel().basicPublish("",deliver.getProperties().getReplyTo(),replyProps,responseMsg.getBytes(StandardCharsets.UTF_8));
            consumer.getChannel().basicAck(deliver.getEnvelope().getDeliveryTag(),multiple);
        };
        consumer.consume(callback);

        SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
        provider.sendRpc("hello,automannn!");
    }
}

# rocketmq

# 概述

  • 阿里巴巴出品的一款纯JAVA,分布式,队列模型的开源消息中间件

# 发展历程

  • 2007年,阿里开始五彩石项目,Notify作为交易核心信息流转系统诞生,成为RocketMQ的雏形;
  • 2010年,案例大规模使用ActiveMQ作为阿里的消息内核,但它不具备海量堆积能力
  • 2011年,kafka开源,对Kafka进行深入研究后,开发出MetaMQ
  • 2012年,MetaMQ进一步抽象,形成RocketMQ,并开源
  • 2016年,RocketMQ承载万亿级消息流转,11月,捐赠给Apache基金会
  • 2017年,成为Apache的顶级项目;

# 发布历史

  • 2017.01: 4.0发布
  • 2019.01: 4.4发布
  • 2022.03: 5.0发布

# 工作原理

rocketmq工作原理

# 基本概念

  • 消息(Message): 生产和消费数据的最小单位,每条消息必须属于一个主题;
  • 主题(Topic): 表示一类消息的集合;
  • 标签(Tag): 为消息设置的标签,用于区分同一主题下不同类型的消息,(topic是消息的一级分类,tag是消息的二级分类);
  • 队列(Queue): 消息的物理实体;
  • 生产者(Producer): 生产者,可集群部署;
  • 消费者(Consumer): 消费者,可集群部署;
  • Broker: 负责消息的存储,查询消费,支持主从部署;
  • NameServer: Topic路由注册中心,支持Broker的动态注册和发现;

# 安装部署

  • 启动NamingServer :mqnamesrv.cmd
  • 启动mqbroker:mqbroker.cmd
  • 编译并启动rocketmq-dashboard-master

# 应用客户端

  • 通用
package com.automannn.demo.mq.rocket.common;

/**
 * @author automannn
 * @Date 2022/4/14
 */
public interface RocketMqConfigInstant {
    String NAME_SERVER_ADDRESS = "127.0.0.1:9876";
}
package com.automannn.demo.mq.rocket.common;

import org.apache.rocketmq.client.ClientConfig;

/**
 * @author automannn
 * @Date 2022/4/14
 */
public class RocketClientBuilder {

    private RocketClientBuilder(){

    }

    public static RocketClientBuilder getBuilder(){
        return new RocketClientBuilder();
    }

    public ClientConfig build(){
        ClientConfig clientConfig = new ClientConfig();
        //nameServer地址有多个时,用逗号隔开
        clientConfig.setNamesrvAddr(RocketMqConfigInstant.NAME_SERVER_ADDRESS);
        //客户端实例名称
        clientConfig.setInstanceName("DEFAULT");
        //通信层异步回调线程数,默认值4
        clientConfig.setClientCallbackExecutorThreads(10);
        //轮询 Name Server 间隔时间
        clientConfig.setPollNameServerInterval(30000);
        //向broker发送心跳间隔时间
        clientConfig.setHeartbeatBrokerInterval(30000);
        //持久化Consumer 消费进度间隔时间,默认5000
        clientConfig.setPersistConsumerOffsetInterval(10000);
        return clientConfig;
    }
}
package com.automannn.demo.mq.rocket.common;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

/**
 * @author automannn
 * @Date 2022/4/14
 */
public class RocketMqConsumerFactory {
    //主动权在 consumer,consumer主动拉取消息
    public static DefaultMQPullConsumer getPullConsumer(){
        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer();
        pullConsumer.resetClientConfig(RocketClientBuilder.getBuilder().build());
        return pullConsumer;
    }

    //主动权在 broker,主动推送消息到consumer
    public static DefaultMQPushConsumer getPushConsumer(){
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer();
        pushConsumer.resetClientConfig(RocketClientBuilder.getBuilder().build());
        return pushConsumer;
    }
}
package com.automannn.demo.mq.rocket.common;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

/**
 * @author automannn
 * @Date 2022/4/14
 */
public class RocketMqProducerFactory {
    public static DefaultMQProducer getProducer(){
        DefaultMQProducer producer = new DefaultMQProducer();
        producer.resetClientConfig(RocketClientBuilder.getBuilder().build());
        //发送消息时,自动创建服务器不存在的topic,默认创建4个队列
        producer.setDefaultTopicQueueNums(4);
        //发送消息超时时间
        producer.setSendMsgTimeout(10000);
        //消息body压缩阈值,4096
        producer.setCompressMsgBodyOverHowmuch(4096);
        //消息重试
        producer.setRetryAnotherBrokerWhenNotStoreOK(false);
        return producer;
    }
}
package com.automannn.demo.mq.rocket.common;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author automannn
 * @Date 2022/4/14
 */
@Slf4j
public class ConsumerHandler {
    private static final AtomicInteger instanceNum = new AtomicInteger(6);

    public void handle(String groupName,String topic,String tags){
        DefaultMQPushConsumer pushConsumer = RocketMqConsumerFactory.getPushConsumer();
        pushConsumer.setConsumerGroup(groupName);
        pushConsumer.setInstanceName("instance"+instanceNum.getAndIncrement());
        try {
            pushConsumer.subscribe(topic,tags);
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        pushConsumer.registerMessageListener((MessageListenerOrderly) (msgs, context)->{
            context.setAutoCommit(true);
            log.debug(Thread.currentThread().getName()+" receive new messages: "+ msgs+ "%n");
            log.debug(new String(msgs.get(0).getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        });

        try {
            pushConsumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public void handleBroadCast(String groupName,String topic,String tags){
        DefaultMQPushConsumer pushConsumer = RocketMqConsumerFactory.getPushConsumer();
        //消费模型: 广播模式
        pushConsumer.setMessageModel(MessageModel.BROADCASTING);
        pushConsumer.setConsumerGroup(groupName);
        try {
            pushConsumer.subscribe(topic,tags);
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)->{
            log.debug(Thread.currentThread().getName()+" receive new messages: "+ msgs+ "%n");
            log.debug(new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        try {
            pushConsumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}
  • 单向消息
package com.automannn.demo.mq.rocket.onewayMsg;

import com.automannn.demo.mq.rocket.common.RocketMqConsumerFactory;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/14
 * @Description 单向传输,中等可靠性,如日志收集
 */

@Slf4j
public class OnewayProducerTest {
    public static void main(String[] args) {
        DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
        producer.setProducerGroup("oneway");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        for (int i=0;i<100;i++){
            Message msg = new Message("OneWayTopic","TagA",("hello,Autmannn:"+i).getBytes(StandardCharsets.UTF_8));
            try {
                producer.sendOneway(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        DefaultMQPushConsumer pushConsumer = RocketMqConsumerFactory.getPushConsumer();
        pushConsumer.setConsumerGroup("oneway");
        try {
            pushConsumer.subscribe("OneWayTopic","TagA || TagB || TagC");
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        pushConsumer.registerMessageListener((MessageListenerOrderly) (msgs, context)->{
            context.setAutoCommit(true);
            log.debug(Thread.currentThread().getName()+" receive new messages: "+ msgs+ "%n");
            log.debug(new String(msgs.get(0).getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        });

        try {
            pushConsumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

//        producer.shutdown();
//        pushConsumer.shutdown();
    }
}
  • 异步可靠消息
package com.automannn.demo.mq.rocket.asyncMsg;

import com.automannn.demo.mq.rocket.common.ConsumerHandler;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/14
 * @Description 可靠异步消息传输: 通常用于响应时间敏感的业务场景
 */
@Slf4j
public class AsyncMsgTest {
    public static void main(String[] args) {
        DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
        producer.setProducerGroup("async");
        producer.setRetryTimesWhenSendAsyncFailed(0);
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        for (int i=0;i<5;i++){
            final int index =i;
            Message msg = new Message("asyncMessage","TagA","OrderID188",("hello,automann..asyncMsg "+i).getBytes(StandardCharsets.UTF_8));
            try {
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        log.debug("第{}个消息投递成功,消息号为{}",index,sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        ConsumerHandler handler = new ConsumerHandler();
        handler.handle("async","asyncMessage","");

//        producer.shutdown();
    }
}
  • 同步可靠消息
package com.automannn.demo.mq.rocket.syncMsg;

import com.automannn.demo.mq.rocket.common.ConsumerHandler;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/14
 * @Description 可靠同步传输,可用于广泛的场景,如重要消息通知,短信通知,短信营销系统等
 */
@Slf4j
public class SyncMsgTest {
    public static void main(String[] args) {
        DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
        producer.setProducerGroup("sync");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        for (int i=0;i<100;i++){
            Message msg = new Message("syncTopic","TagA",("hello,Automannn.Sync msg: "+i).getBytes(StandardCharsets.UTF_8));
            SendResult sendResult = null;
            try {
                sendResult = producer.send(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
            log.debug("send success"+sendResult);
        }

        ConsumerHandler handler = new ConsumerHandler();
        handler.handle("sync","syncTopic","");

//        producer.shutdown();
    }
}
  • 延迟消息
package com.automannn.demo.mq.rocket.scheduleMsg;

import com.automannn.demo.mq.rocket.common.ConsumerHandler;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/14
 */
@Slf4j
public class ScheduleMsgTest {
    public static void main(String[] args) {
        DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
        producer.setProducerGroup("schedule");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        for (int i=0;i<15;i++){
            long time = System.currentTimeMillis();
            Message msg = new Message("scheduleTopic",("hello,automannn.scheduleMsg:"+time).getBytes(StandardCharsets.UTF_8));
            //延时级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            msg.setDelayTimeLevel(7);
            try {
              SendResult sendResult =  producer.send(msg);
                log.debug("send success"+sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        ConsumerHandler handler = new ConsumerHandler();
        handler.handle("schedule","scheduleTopic","");

        //producer.shutdown();
    }
}
  • 过滤消息
package com.automannn.demo.mq.rocket.filterMsg;

import com.automannn.demo.mq.rocket.common.RocketMqConsumerFactory;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/14
 */
@Slf4j
public class FilterMsgTest {
    public static void main(String[] args) {
        DefaultMQProducer producer  = RocketMqProducerFactory.getProducer();
        producer.setProducerGroup("filter");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        for (int i=0;i<10;i++){
            Message message = new Message("filterTopic","tagA",("hello,filter-"+i).getBytes(StandardCharsets.UTF_8));
            message.putUserProperty("a",String.valueOf(i));
            try {
                SendResult sendResult = producer.send(message);
                log.debug("send success: "+ sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        DefaultMQPushConsumer consumer = RocketMqConsumerFactory.getPushConsumer();
        consumer.setConsumerGroup("filter");
        //当消息中有 a属性,且满足限制条件,才消费
        try {
            consumer.subscribe("filterTopic", MessageSelector.byTag("tagA"));
            //consumer.subscribe("filterTopic", MessageSelector.bySql("a >= 0 and a<= 3"));
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs,context)->{
            log.debug("receive msg: "+msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }


//        consumer.shutdown();
//        producer.shutdown();

    }
}
  • 有序消息
package com.automannn.demo.mq.rocket.orderMsg;

import com.automannn.demo.mq.rocket.common.RocketMqConsumerFactory;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author automannn
 * @Date 2022/4/14
 */
@Slf4j
public class OrderMsgTest {
    public static void main(String[] args) {
        DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
        producer.setProducerGroup("order");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        for (int i=0;i<20;i++){
            //根据业务进行定义,相同的id ,会有序分到一个 queue
            int virtualId = i%2;
            long time = System.currentTimeMillis();
            String[] tags = new String[]{"tagA","tagB"};
            Message message = new Message("orderTopic",tags[i%tags.length],"key"+i,("hello,automannn. orderMsg "+time).getBytes(StandardCharsets.UTF_8));

            try {
                SendResult sendResult = producer.send(message,(MessageQueueSelector)(mqs,msg,arg)->{
                    Integer id = (Integer) arg;
                    int index = id%mqs.size();
                    return mqs.get(index);
                }, virtualId);
                log.debug("send success: "+sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        DefaultMQPushConsumer consumer = RocketMqConsumerFactory.getPushConsumer();
        consumer.setConsumerGroup("order");
        try {
            consumer.subscribe("orderTopic","tagA || tagB");
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                consumeOrderlyContext.setAutoCommit(true);
                log.debug("msgs size = "+ list.size());
                for (MessageExt messageExt: list){
                    log.debug("receive new Message: "+messageExt);
                    log.debug(new String(messageExt.getBody()));
                }
                this.consumeTimes.incrementAndGet();
                log.debug("consume times : {}",consumeTimes.get());

                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    consumeOrderlyContext.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

//        consumer.shutdown();
//        producer.shutdown();
    }
}
  • 事务消息
package com.automannn.demo.mq.rocket.transactionMsg;

import com.automannn.demo.mq.rocket.common.RocketMqConfigInstant;
import com.automannn.demo.mq.rocket.common.RocketMqConsumerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author automannn
 * @Date 2022/4/14
 */
@Slf4j
public class TxMsgTest {
    public static void main(String[] args) {
        TransactionCheckListener listener = new TransactionCheckListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("transaction");
        //事务回查最小并发数、最大并发数
        producer.setCheckThreadPoolMinSize(2);
        producer.setCheckThreadPoolMaxSize(2);
        producer.setNamesrvAddr(RocketMqConfigInstant.NAME_SERVER_ADDRESS);
        //队列数
        producer.setCheckRequestHoldMax(2000);
        producer.setTransactionCheckListener(listener);
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        TransactionExecuterImpl executer = new TransactionExecuterImpl();
        for(int i=0;i<4;i++){
            Long time = System.currentTimeMillis();
            Message msg = new Message("transactionTopic","tagA","KEY"+i,("hello,automann.TransactionMsg-"+i).getBytes(StandardCharsets.UTF_8));
            SendResult sendResult= null;
            try {
                sendResult = producer.sendMessageInTransaction(msg,executer,null);
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            log.debug("send success: "+sendResult);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        /*===========consumer start===============*/
        DefaultMQPushConsumer consumer = RocketMqConsumerFactory.getPushConsumer();
        consumer.setConsumerGroup("transaction");
        try {
            consumer.subscribe("transactionTopic","tagA");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs,context)->{
            log.debug("consume success: "+msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

//        consumer.shutdown();

//        producer.shutdown();
    }
    //未决事务,服务器回查客户端
    static class TransactionCheckListenerImpl implements TransactionCheckListener{

        private AtomicInteger transactionIndex = new AtomicInteger(0);

        @Override
        public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
            log.debug("server checking txMsg: "+ messageExt);

            int value = transactionIndex.getAndIncrement();
            if(value==0){
                log.error("count not find db");
                throw new RuntimeException("could not find db");
            }else if(value==1){
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }else if(value==2){
                return LocalTransactionState.COMMIT_MESSAGE;
            }
            return LocalTransactionState.UNKNOW;
        }
    }

    //执行本地事务
    static class TransactionExecuterImpl implements LocalTransactionExecuter{

        private AtomicInteger transactionIndex = new AtomicInteger(1);

        @Override
        public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {
            int value = transactionIndex.getAndIncrement();
            log.debug("value == "+value);
            if(value==0){
                log.error("could not find db");
                throw new RuntimeException("could not find db");
            }else if (value==1){
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }else if (value==2){
                return LocalTransactionState.COMMIT_MESSAGE;
            }
            return LocalTransactionState.UNKNOW;
        }
    }
}
  • 广播消息
package com.automannn.demo.mq.rocket.broadcasting;

import com.automannn.demo.mq.rocket.common.ConsumerHandler;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/14
 */
@Slf4j
public class BroadcastingTest {
    public static void main(String[] args) {

        ConsumerHandler handler1 = new ConsumerHandler();
        ConsumerHandler handler2 = new ConsumerHandler();
        handler1.handle("broadcast202204151","broadcastTopic20220415","tagA");
        handler2.handle("broadcast202204152","broadcastTopic20220415","tagA");
//        handler1.handleBroadCast("broadcast202204151","broadcastTopic20220415","tagA");
//        handler2.handleBroadCast("broadcast202204152","broadcastTopic20220415","tagA");


        DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
        producer.setProducerGroup("broadcast20220415");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        Message message = new Message("broadcastTopic20220415","tagA","KEY1","hello,boradcast Msg.the consumer mast start before producer".getBytes(StandardCharsets.UTF_8));
        try {
            for (int i=0;i<1;i++){
                SendResult sendResult= producer.send(message);
                log.debug("send success : "+sendResult );
            }
        } catch (Exception e) {
            e.printStackTrace();
        }



//        producer.shutdown();
    }
}

# kafka

# 概述

  • Apache软件基金会开发的开源流处理平台,由scalajava编写.
  • 高吞吐量的分布式发布订阅消息系统.
  • kafka无特别含义,当时的开发架构师喜欢franz kafka(一个小说家).

# 发展史

  • 2010年左右,Linkedin为了解决数据管道问题,由于ActiveMQ无法满足需求,决定研发自己的消息传递系统;
  • 于2011年,贡献给Apache基金会,2012年,成为apache的顶级开源项目.

# 发布史

  • 0.7,发布于2012.01,是第一个发布版本;
  • 1.0,发布于2017.11
  • 2.0,发布于2018.07
  • 3.0,发布于2021.09
  • 3.1,发布于2022.01,是目前的最新版

# kafka术语

  • 生产者
    • 向broker发布消息的应用程序
    • 同时负责选择发布到topic上的哪一个分区
  • 消费者
    • 从消息队列中,获取消息的客户端程序
    • 若干消费者组成一个消费者组,一条消息只能被一个消费者消费;
    • 所有消费者都在一个组内,则行成queue模型;所有消费者都在不同组,则形成了发布-订阅模型;
  • broker
    • kafka实例,多个broker组成一个kafka集群
    • 通常一台机器部署一个kafka实例
  • topic
    • 一组消息的归纳
    • 服务器端消息的逻辑存储单元。一个topic通常包含若干个Partition分区
  • 分区
    • 一个分区可看作一个队列
    • 若干个分区,可以被若干个consumer同时消费,达到消费者高吞吐量
    • 单个分区有序,整体无序
  • message
    • 消息,也称日志消息,是kafka服务端实际存储的数据
    • 每一条消息都由一个key,一个value,以及时间戳组成
  • offset
    • 偏移量,分区中的消息位置,由kafka自身维护
    • consumer消费时,也要保存一份offset以维护消费的位置信息

# 服务端安装

  • 安装及运行zookeeper
  • 配置日志地址,安装及启动kafka: .\kafka-server-start.bat ../../config/server.properties
  • 验证
    • 创建主题: .\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic automannn
    • 查询主题: .\kafka-topics.bat --list --zookeeper localhost:2181
    • 启动生产者: .\kafka-console-producer.bat --broker-list localhost:9092 --topic automannn
    • 启动消费者: .\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic automannn --from-beginning
    • 查询消费者: .\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list

# 客户端

package com.automannn.demo.mq.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Arrays;
import java.util.Properties;

/**
 * @author automannn
 * @Date 2022/4/15
 */
@Slf4j
public class KafkaTest {

    public static void main(String[] args) {
        Producer producer = KafkaProducerFactory.getProducer();
        for (int i=0;i<10;i++){
            producer.send(new ProducerRecord("automannn",String.valueOf(i),"this is msg-"+i));
        }
        producer.close();

        Consumer consumer =KafkaConsumerFactory.getConsumer();
        consumer.subscribe(Arrays.asList("automannn"));
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                log.debug("consume msg: offset = {},key={},value={}",record.offset(),record.key(),record.value());
            }
        }
    }

    static class KafkaProducerFactory{
        public static Producer getProducer(){
            Properties props = new Properties();
            //kafka服务端地址
            props.put("bootstrap.servers","localhost:9092");
            //判断请求是否成功的条件,all将会阻塞消息,性能最低,但最可靠
            props.put("acks","all");
            //请求失败,自动重试的重试次数
            props.put("retries",0);
            //生产者缓存每个分区未发送的信息
            props.put("batch.size",16348);
            //发送缓存的时间
            props.put("linger.ms",1);
            //生产者可用的缓存总量
            props.put("buffer.memory",33554432);
            //键值序列化器
            props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

            return new KafkaProducer(props);
        }
    }

    static class KafkaConsumerFactory{

        public static Consumer getConsumer(){
            Properties props = new Properties();
            props.put("bootstrap.servers","localhost:9092");
            //消费者组名称
            props.put("group.id","automannnTest");
            //偏移量由 auto.commit.interval.ms控制自动提交的频率
            props.put("enable.auto.commit","true");
            props.put("auto.commit.interval.ms","1000");

            //序列化器
            props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

            return new KafkaConsumer(props);
        }
    }
}