# 消息队列
- 消息传输过程中,用来保存消息的容器;
- 消息队列,是一种 应用程序对应用程序的通信方法;
- 队列的使用,出去了接收和发送应员工程序同时执行的要求;
# RabbitMQ
# 发展历程
整理自玩转RabbitMQ之二:RabbitMQ简史 (opens new window)
- 一种通用的软件总线,可以 供各种应用程序接入,就像计算机通过硬件总线接入连接各种硬件设备一样
- 思路来源于硬件总线,开发**软件总线(Teknekron)**并应用于金融交易场所,后来诞生消息队列通信软件的鼻祖: The Information Bus (TIB)
- 1994年左右,微软与IBM分别研发出 IBM MQ系列和 MSMQ系列。
- 传统MQ各家厂商标准不一致,存在产品壁垒。为解决这个问题,SUN的工程师创造了JMS。事实证明,使用JMS让应用程序变得更加脆弱.
- 2004年,为了寻找比JMS更好的通信方案,一群大佬开发出了AMQP(Advanced Message Queuing Protocol),2006年初稿发布
- 2007年,RabbitMQ之父,开发Metalogic时,积累了Erlang和分布式的经验,偶然接触到AMQP,便通过Erlang来开发了AMQP服务器;
# 发布历史
- 2007.02: 发布1.0版本
- 2010.08:发布2.0版本
- 2012.11:发布3.0版本
- 2021.07:发布3.9版本
# 特点
- Erlang语言开发
- 基于AMQP协议实现
- 适用于对数据一致性,稳定性和可靠性要求很高的场景
- 对性能和吞吐量 的要求在其次
# 使用场景
- 解耦的场合:关心的是“通知”,而非“处理”, 下单->出库
- 异步的场合: 异步并行通知,提升效率
- 削峰的场合:秒杀场景
# 弊端
- 可用性降低,MQ挂了以后,会导致系统崩溃
- 系统复杂性提高: 重复消费问题?如何保证可靠传递?如何保证消息传递顺序?
- 业务一致性问题:下单发送消息后,直接返回成功。若后续服务失败,会产生数据不一致问题
# 工作模式
整理至: RabbitMQ原理讲解——一只特立独行的狗 (opens new window)
# 基本概念
- Channel(信道): 建立在真实的TCP连接内的虚拟连接,双向数据流通道
- Producer(生产者): 向消息队列发布消息的客户端程序
- Consumer(消费者): 从消息队列中,获取消息的客户端程序
- Message(消息): 由消息头和消息体组成; 消息体不透明;消息头由一系列可选属性组成,如: routing-key(路由键)、priority(优先级)、delivery-mode(是否持久化存储)等。
- Routing Key(路由键): 消息头的一个属性,用于标记消息的路由规则,决定交换机的转发路径。 最大长度255字节
- Queue(消息队列): 存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。,它是消息的容器,也是消息的终点。 一条消息只能被一个订阅者接收
- Exchange(交换机|路由器): 提供Producer到Queue之间的匹配,接收生产者发送的消息,并将这些消息按照路由规则,转发到消息队列。如果没有Queue绑定到Exchange,则会直接丢弃消息
- Binding(绑定): 用于建立 Exchange 和 queue之间的关联。
- Binding Key(绑定键): Exchange与Queue的绑定关系,用于匹配Routing Key。 最大长度255字节。
- Broker: RabbitMQ Server,服务器实体
# 消息调度策略
- 消息的调度策略与三个因素有关: Exchange Type(交换机类型)、Binding Key(交换机和队列的绑定关系)、Routing key(消息的标记信息)
- Exchange 根据消息的 Routing Key和Exchange绑定Queue的Binding Key分配消息。
- 生产者在发送消息给Exchange时,一般会指定顶一个Routing key,来指定这个消息的路由规则。它需要与 Exchange Type及Binding Key联合使用才能最终生效。
- 在Exchange Type与 Bind Key固定的情况下(一般这些内容,都是提前固定配置好的),生产者通过指定Routing Key即可决定消息流向哪里。
# 交换机类型
# Fanout交换机(订阅模式|广播模式)
- 该种模式下,交换机把所有接收到的消息,发送到所有与该交换机绑定的消息队列中。
- 此种模式,与BindingKey与RoutingKey无关
# Direct(路由模式)
- 需要消息的RoutingKey与BindingKey完全匹配时,消息才会被转发到消息队列。
- 该模式时交换机的默认模式。
- RabbitMQ默认提供了一个交换机,名字时空字符串,类型是Direct,绑定到所有的Queue,每一个Queue与该无名交换机之间的BindingKey是Queue的名字,因此我们可以不指定交换器也可以发送和接收消息,实际上使用的是默认的Exchange。
# Topic(通配符模式|主题模式)
- 用消息的RoutingKey与BindingKey进行模糊匹配,匹配成功,将消息分发到该Queue
- RoutingKey是一个句点号"."分隔的字符串,被句点号分隔开的的每一段,称为一个单词
- BindingKey可以存在两种特殊字符,“*”与“#”,用于模糊匹配。 "*"用于匹配一个单词,"#"用于匹配多个单词
# RPC机制
# RPC机制的必要性
- MQ本身基于异步进行消息处理,生产者对消息的处理情况无感知;
- 实际应用场景,可能会出现同步处理的场景,即等待消费者将消息消费后,再进行下一步处理。相当于RPC调用。
# 实现机制
- 生产者发送消息时,在消息属性中,设置两个值(AMQP协议所定义的),: replyTo(queue的名称,消费者处理完成后通知); correlationId(此次请求的标识号,消费者处理完成后,将此属性一并返回,用于解释消息的唯一性);
- 消费者收到消息并处理
- 消费者处理完消息后,生成一条应答消息到 replyTo指定的 Queue,同时带上correlationId属性
- 生产者之前已订阅replyTo指定的Queue,根据correlationId分析哪条消息被处理了,并执行后续业务处理。
# 消息确认机制
- 实际业务场景中,可能出现消费者收到Queue的消息,但没处理完后就出现宕机,导致消息丢失。
- 为了避免这种情况,可以让消费者再消费完消息后,发送一个回执给RabbitMQ,RabbitMQ收到回执后,才将该消息删除。
- 如果在处理过程中,消费者断开了,则会将该消息发送给其它消费者进行处理。不存在Timeout概念,除非消费者断开
- 如果开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,则可能产生消息堆积。
# 消息持久化机制
- 当我们希望在RabbitMQ服务重启的情况下,也不会丢失消息时,就可以将Queue与Message设置为可持久化的。
- 此时依然可能出现小概率丢失事件的发生(接收到生产者消息,还没来得及持久化,就宕机的情况)。
- 此时可以借助事务,将小概率事件管理起来。
# 事务机制
- 通过txSelect()可以开启一个事务
- 生产者发送消息给服务器
- 通过txCommit()提交事务
- 当txCommit()提交事务成功,则该消息一定会持久化
# 消息分发机制
- 一般情况下,生产者生产消息时比较快的,消费者由于涉及到一些业务逻辑处理,消费速度慢于生产速度,容易导致消息堆积;
- 此时使用工作队列,多个消费者同时消费数据;
- 工作队列有两种分发数据方式: 轮询分发和公平分发。
- 轮询分发,不会因为多个消费者处理数据的速度不一致,而影响分发结果;
- 公平分发通常与手动应答配合使用,不会出现消息丢失且业务逻辑没能处理的情况;
# 部署
- 安装Erlang/OTP库,并保持版本匹配,
OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库
- 安装RabbitMQ
- 安装管理插件
rabbitmq-plugins.bat enable rabbitmq_management
# 消费模式
# 无序消费:
多线程并发消费
# 有序消费
- 生产者角度: 对于需要顺序的消息,发到同一个队列中;
- 消费者角度: 使用MessageListenerOrderly,此时消费者会定期锁住topic的所有队列,保证只有它在消费,本地则使用单线程保证按序消费
# 集群消费
- 同一个消费组里的消费者,会分摊消息,比如A消费了,B就不能消费;
- 默认情况下,采用的是这种方式
# 广播消费
- 同一个消费组里的消费者,每个消费者都能收到消息,AB能消费同一个消息,相当于广播;
- 应用场景,如配置刷新,需要更新每一个机器实例;
- **实际用的比较少,且存在很多问题。网上各种回复不一,实践过程中产生的现象记录一下:**消费者需要先于生产者启动;广播模式下,消费者点位不会移动;无法通过dashborad发送广播消息;
# 客户端应用
# 通用
- 生产者
package com.automannn.demo.mq.rabbit.common;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author automannn
* @Date 2022/4/13
*/
@Slf4j
public class SimpleProvider{
//xxx: 是否持久化
boolean durable = false;
//xxx: 队列是否独有(其他信道无法访问)
boolean exclusive = false;
//xxx: 是否自动删除队列,队列没有消费者时,删除
boolean autoDelete = false;
final private String queueName;
private Connection connection;
private Channel channel;
private String host;
private int port;
private String userName;
private String password;
public SimpleProvider(String host,int port,String userName,String password,String queueName){
Assert.isTrue(!StringUtils.isEmpty(queueName),"parameter name cannot be null!");
this.host = host;
this.port = port;
this.userName = userName;
this.password = password;
this.queueName = queueName;
this.init();
}
public void init(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(userName);
factory.setPassword(password);
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName,durable,exclusive,autoDelete,null);
} catch (Exception e) {
e.printStackTrace();
}
}
public void send(String msg){
Assert.isTrue(!StringUtils.isEmpty(msg),"parameter msg cannot be null!");
try {
//xxx: 参数分别为: 交换机,routingKey,消息属性,消息体
channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8));
log.debug("消息投递成功!");
} catch (IOException e) {
e.printStackTrace();
}
}
public void declareFanout(String exchangeName){
try {
channel.exchangeDeclare(exchangeName,"fanout");
//默认要绑定本队列
channel.queueBind(queueName,exchangeName,"");
} catch (IOException e) {
e.printStackTrace();
}
}
public void publish(String exchangeName,String msg){
try {
channel.basicPublish(exchangeName,"",null,msg.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
}
}
public void publish(String exchangeName,String msg,String routingKey){
try {
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
}
}
public void close(){
try {
this.channel.close();
this.connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendRpc(String msg){
BlockingQueue<String> responseQueue = new ArrayBlockingQueue<String>(1);
String corrId = UUID.randomUUID().toString();
String replyQueueName = "automannnReplyQueue";
try {
channel.queueDeclare(replyQueueName,false,false,false,null);
} catch (IOException e) {
e.printStackTrace();
}
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().replyTo(replyQueueName).correlationId(corrId).build();
try {
channel.basicPublish("",queueName,properties,msg.getBytes(StandardCharsets.UTF_8));
String ctag = channel.basicConsume(replyQueueName,true,(tag,delivery)->{
if(delivery.getProperties().getCorrelationId().equals(corrId)){
responseQueue.offer(new String(delivery.getBody(),StandardCharsets.UTF_8));
}
},tag->{});
String result = responseQueue.take();
System.out.println(result);
//取消订阅
channel.basicCancel(ctag);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 消费者
package com.automannn.demo.mq.rabbit.common;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author automannn
* @Date 2022/4/13
*/
@Slf4j
public class SimpleConsumer{
private String host;
private int port;
private String username;
private String password;
private Connection connection;
private Channel channel;
private String queueName;
private AtomicBoolean consuming=new AtomicBoolean(false);
//业务回调
private DeliverCallback deliverCallback;
public SimpleConsumer(String host,int port,String username,String password,String queueName){
this.host=host;
this.port = port;
this.username = username;
this.password = password;
this.queueName = queueName;
this.init();
}
private void init(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName,false,false,false,null);
} catch (Exception e) {
e.printStackTrace();
}
}
public void consume(DeliverCallback callback){
if (consuming.compareAndSet(false, true)) {
try {
channel.basicConsume(queueName,true,callback,tag->{});
} catch (IOException e) {
e.printStackTrace();
}
}
}
public Channel getChannel() {
return channel;
}
public void declareFanout(String exchangeName){
try {
channel.exchangeDeclare(exchangeName,"fanout");
//默认要绑定本队列
channel.queueBind(queueName,exchangeName,"");
} catch (IOException e) {
e.printStackTrace();
}
}
public void declareTopic(String topicExchangeName,String routingKey){
try {
channel.exchangeDeclare(topicExchangeName,"topic");
//默认要绑定本队列
channel.queueBind(queueName,topicExchangeName,routingKey);
} catch (IOException e) {
e.printStackTrace();
}
}
//工作队列方式,手动确认
public void consumeManualAck(DeliverCallback callback){
if (consuming.compareAndSet(false, true)) {
try {
//xxx: 每次只读取一个消息
channel.basicQos(1);
boolean autoAck = false;
channel.basicConsume(queueName,autoAck,callback,tag->{});
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void close(){
try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
# 匿名交换机
package com.automannn.demo.mq.rabbit.simple;
import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author automannn
* @Date 2022/4/12
*/
@Slf4j
public class SimpleBrokerTest {
public static void main(String[] args) {
String queueName = "automannn.testQueue";
String host = "127.0.0.1";
int port = 5672;
String userName = "guest";
String password = "guest";
SimpleConsumer consumer = new SimpleConsumer(host,port,userName,password,queueName);
DeliverCallback callback = (tag,deliver)->{
String message = new String(deliver.getBody(),StandardCharsets.UTF_8);
log.debug("====消费者消费消息======");
log.debug(message);
};
//xxx: 消费
consumer.consume(callback);
SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
provider.send("hello,automannn!");
}
}
# 工作队列
package com.automannn.demo.mq.rabbit.workQueues;
import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/13
*/
@Slf4j
public class WorkQueueTest {
public static void main(String[] args) {
String queueName = "automannn.testQueue";
String host = "127.0.0.1";
int port = 5672;
String userName = "guest";
String password = "guest";
SimpleConsumer consumer1 = new SimpleConsumer(host,port,userName,password,queueName);
DeliverCallback callback1 = (tag, deliver)->{
String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("====消费者消费消息======");
log.debug(message);
boolean multiple = false;
consumer1.getChannel().basicAck(deliver.getEnvelope().getDeliveryTag(),multiple);
};
consumer1.consumeManualAck(callback1);
SimpleConsumer consumer2 = new SimpleConsumer(host,port,userName,password,queueName);
DeliverCallback callback2 = (tag, deliver)->{
String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("====消费者消费消息======");
log.debug(message);
boolean multiple = false;
consumer2.getChannel().basicAck(deliver.getEnvelope().getDeliveryTag(),multiple);
};
consumer2.consumeManualAck(callback2);
SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
for (int i=0;i<10;i++){
provider.send("hello,automannn! sequence is "+i);
}
}
}
# 扇形交换机
package com.automannn.demo.mq.rabbit.fanout;
import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/13
*/
@Slf4j
public class FanoutExchangeTest {
public static void main(String[] args) {
String queueName = "automannn.testQueue";
String host = "127.0.0.1";
int port = 5672;
String userName = "guest";
String password = "guest";
String fanoutExchangeName= "automannnFanoutExchange";
DeliverCallback callback = (tag, deliver)->{
String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
log.debug("====消费者消费消息======");
log.debug(message);
};
SimpleConsumer consumer1 = new SimpleConsumer(host,port,userName,password,queueName+"1");
SimpleConsumer consumer2 = new SimpleConsumer(host,port,userName,password,queueName+"2");
SimpleConsumer consumer3 = new SimpleConsumer(host,port,userName,password,queueName+"3");
SimpleConsumer consumer4 = new SimpleConsumer(host,port,userName,password,queueName+"4");
consumer1.declareFanout(fanoutExchangeName);
consumer2.declareFanout(fanoutExchangeName);
consumer3.declareFanout(fanoutExchangeName);
consumer4.declareFanout(fanoutExchangeName);
consumer1.consume(callback);
consumer2.consume(callback);
consumer3.consume(callback);
consumer4.consume(callback);
SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
provider.declareFanout(fanoutExchangeName);
provider.publish(fanoutExchangeName,"hello,automannn...");
}
}
# 主题交换机
package com.automannn.demo.mq.rabbit.topic;
import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/13
*/
@Slf4j
public class TopicExchangeTest {
public static void main(String[] args) {
String queueName = "automannn.testQueue";
String host = "127.0.0.1";
int port = 5672;
String userName = "guest";
String password = "guest";
String topicExchangeName= "automannnTopicExchange";
DeliverCallback callback = (tag, deliver)->{
String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
log.debug("====消费者消费消息======");
log.debug(message);
};
String topicGroup1 ="user.*";
String topicGroup2 = "msg.*";
SimpleConsumer consumer1 = new SimpleConsumer(host,port,userName,password,queueName+"a");
SimpleConsumer consumer2 = new SimpleConsumer(host,port,userName,password,queueName+"b");
SimpleConsumer consumer3 = new SimpleConsumer(host,port,userName,password,queueName+"c");
SimpleConsumer consumer4 = new SimpleConsumer(host,port,userName,password,queueName+"d");
consumer1.declareTopic(topicExchangeName,topicGroup1);
consumer1.declareTopic(topicExchangeName,topicGroup2);
consumer2.declareTopic(topicExchangeName,topicGroup1);
consumer3.declareTopic(topicExchangeName,topicGroup2);
consumer4.declareTopic(topicExchangeName,topicGroup1);
consumer4.declareTopic(topicExchangeName,topicGroup2);
consumer1.consume(callback);
consumer2.consume(callback);
consumer3.consume(callback);
consumer4.consume(callback);
SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
provider.publish(topicExchangeName,"hello,automannn. userTopic","user.dfdfd");
provider.publish(topicExchangeName,"hello,automannn. msgTopic","msg.ssd");
}
}
# rpc调用
package com.automannn.demo.mq.rabbit.rpc;
import com.automannn.demo.mq.rabbit.common.SimpleConsumer;
import com.automannn.demo.mq.rabbit.common.SimpleProvider;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/13
*/
@Slf4j
public class RpcTest {
public static void main(String[] args) {
String queueName = "automannn.testQueue";
String host = "127.0.0.1";
int port = 5672;
String userName = "guest";
String password = "guest";
SimpleConsumer consumer = new SimpleConsumer(host,port,userName,password,queueName);
DeliverCallback callback = (tag, deliver)->{
String message = new String(deliver.getBody(), StandardCharsets.UTF_8);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("====消费者消费消息======");
log.debug(message);
boolean multiple = false;
String responseMsg = "消费成功";
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(deliver.getProperties().getCorrelationId()).build();
consumer.getChannel().basicPublish("",deliver.getProperties().getReplyTo(),replyProps,responseMsg.getBytes(StandardCharsets.UTF_8));
consumer.getChannel().basicAck(deliver.getEnvelope().getDeliveryTag(),multiple);
};
consumer.consume(callback);
SimpleProvider provider = new SimpleProvider(host,port,userName,password,queueName);
provider.sendRpc("hello,automannn!");
}
}
# rocketmq
# 概述
- 阿里巴巴出品的一款纯JAVA,分布式,队列模型的开源消息中间件
# 发展历程
- 2007年,阿里开始五彩石项目,Notify作为交易核心信息流转系统诞生,成为RocketMQ的雏形;
- 2010年,案例大规模使用ActiveMQ作为阿里的消息内核,但它不具备海量堆积能力
- 2011年,kafka开源,对Kafka进行深入研究后,开发出MetaMQ
- 2012年,MetaMQ进一步抽象,形成RocketMQ,并开源
- 2016年,RocketMQ承载万亿级消息流转,11月,捐赠给Apache基金会
- 2017年,成为Apache的顶级项目;
# 发布历史
- 2017.01: 4.0发布
- 2019.01: 4.4发布
- 2022.03: 5.0发布
# 工作原理

# 基本概念
- 消息(Message): 生产和消费数据的最小单位,每条消息必须属于一个主题;
- 主题(Topic): 表示一类消息的集合;
- 标签(Tag): 为消息设置的标签,用于区分同一主题下不同类型的消息,(topic是消息的一级分类,tag是消息的二级分类);
- 队列(Queue): 消息的物理实体;
- 生产者(Producer): 生产者,可集群部署;
- 消费者(Consumer): 消费者,可集群部署;
- Broker: 负责消息的存储,查询消费,支持主从部署;
- NameServer: Topic路由注册中心,支持Broker的动态注册和发现;
# 安装部署
- 启动NamingServer :
mqnamesrv.cmd
- 启动mqbroker:
mqbroker.cmd
- 编译并启动rocketmq-dashboard-master
# 应用客户端
- 通用
package com.automannn.demo.mq.rocket.common;
/**
* @author automannn
* @Date 2022/4/14
*/
public interface RocketMqConfigInstant {
String NAME_SERVER_ADDRESS = "127.0.0.1:9876";
}
package com.automannn.demo.mq.rocket.common;
import org.apache.rocketmq.client.ClientConfig;
/**
* @author automannn
* @Date 2022/4/14
*/
public class RocketClientBuilder {
private RocketClientBuilder(){
}
public static RocketClientBuilder getBuilder(){
return new RocketClientBuilder();
}
public ClientConfig build(){
ClientConfig clientConfig = new ClientConfig();
//nameServer地址有多个时,用逗号隔开
clientConfig.setNamesrvAddr(RocketMqConfigInstant.NAME_SERVER_ADDRESS);
//客户端实例名称
clientConfig.setInstanceName("DEFAULT");
//通信层异步回调线程数,默认值4
clientConfig.setClientCallbackExecutorThreads(10);
//轮询 Name Server 间隔时间
clientConfig.setPollNameServerInterval(30000);
//向broker发送心跳间隔时间
clientConfig.setHeartbeatBrokerInterval(30000);
//持久化Consumer 消费进度间隔时间,默认5000
clientConfig.setPersistConsumerOffsetInterval(10000);
return clientConfig;
}
}
package com.automannn.demo.mq.rocket.common;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
/**
* @author automannn
* @Date 2022/4/14
*/
public class RocketMqConsumerFactory {
//主动权在 consumer,consumer主动拉取消息
public static DefaultMQPullConsumer getPullConsumer(){
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer();
pullConsumer.resetClientConfig(RocketClientBuilder.getBuilder().build());
return pullConsumer;
}
//主动权在 broker,主动推送消息到consumer
public static DefaultMQPushConsumer getPushConsumer(){
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer();
pushConsumer.resetClientConfig(RocketClientBuilder.getBuilder().build());
return pushConsumer;
}
}
package com.automannn.demo.mq.rocket.common;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
/**
* @author automannn
* @Date 2022/4/14
*/
public class RocketMqProducerFactory {
public static DefaultMQProducer getProducer(){
DefaultMQProducer producer = new DefaultMQProducer();
producer.resetClientConfig(RocketClientBuilder.getBuilder().build());
//发送消息时,自动创建服务器不存在的topic,默认创建4个队列
producer.setDefaultTopicQueueNums(4);
//发送消息超时时间
producer.setSendMsgTimeout(10000);
//消息body压缩阈值,4096
producer.setCompressMsgBodyOverHowmuch(4096);
//消息重试
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
return producer;
}
}
package com.automannn.demo.mq.rocket.common;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author automannn
* @Date 2022/4/14
*/
@Slf4j
public class ConsumerHandler {
private static final AtomicInteger instanceNum = new AtomicInteger(6);
public void handle(String groupName,String topic,String tags){
DefaultMQPushConsumer pushConsumer = RocketMqConsumerFactory.getPushConsumer();
pushConsumer.setConsumerGroup(groupName);
pushConsumer.setInstanceName("instance"+instanceNum.getAndIncrement());
try {
pushConsumer.subscribe(topic,tags);
} catch (MQClientException e) {
e.printStackTrace();
}
pushConsumer.registerMessageListener((MessageListenerOrderly) (msgs, context)->{
context.setAutoCommit(true);
log.debug(Thread.currentThread().getName()+" receive new messages: "+ msgs+ "%n");
log.debug(new String(msgs.get(0).getBody()));
return ConsumeOrderlyStatus.SUCCESS;
});
try {
pushConsumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public void handleBroadCast(String groupName,String topic,String tags){
DefaultMQPushConsumer pushConsumer = RocketMqConsumerFactory.getPushConsumer();
//消费模型: 广播模式
pushConsumer.setMessageModel(MessageModel.BROADCASTING);
pushConsumer.setConsumerGroup(groupName);
try {
pushConsumer.subscribe(topic,tags);
} catch (MQClientException e) {
e.printStackTrace();
}
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)->{
log.debug(Thread.currentThread().getName()+" receive new messages: "+ msgs+ "%n");
log.debug(new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
try {
pushConsumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
- 单向消息
package com.automannn.demo.mq.rocket.onewayMsg;
import com.automannn.demo.mq.rocket.common.RocketMqConsumerFactory;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/14
* @Description 单向传输,中等可靠性,如日志收集
*/
@Slf4j
public class OnewayProducerTest {
public static void main(String[] args) {
DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
producer.setProducerGroup("oneway");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i=0;i<100;i++){
Message msg = new Message("OneWayTopic","TagA",("hello,Autmannn:"+i).getBytes(StandardCharsets.UTF_8));
try {
producer.sendOneway(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
DefaultMQPushConsumer pushConsumer = RocketMqConsumerFactory.getPushConsumer();
pushConsumer.setConsumerGroup("oneway");
try {
pushConsumer.subscribe("OneWayTopic","TagA || TagB || TagC");
} catch (MQClientException e) {
e.printStackTrace();
}
pushConsumer.registerMessageListener((MessageListenerOrderly) (msgs, context)->{
context.setAutoCommit(true);
log.debug(Thread.currentThread().getName()+" receive new messages: "+ msgs+ "%n");
log.debug(new String(msgs.get(0).getBody()));
return ConsumeOrderlyStatus.SUCCESS;
});
try {
pushConsumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
// producer.shutdown();
// pushConsumer.shutdown();
}
}
- 异步可靠消息
package com.automannn.demo.mq.rocket.asyncMsg;
import com.automannn.demo.mq.rocket.common.ConsumerHandler;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/14
* @Description 可靠异步消息传输: 通常用于响应时间敏感的业务场景
*/
@Slf4j
public class AsyncMsgTest {
public static void main(String[] args) {
DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
producer.setProducerGroup("async");
producer.setRetryTimesWhenSendAsyncFailed(0);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i=0;i<5;i++){
final int index =i;
Message msg = new Message("asyncMessage","TagA","OrderID188",("hello,automann..asyncMsg "+i).getBytes(StandardCharsets.UTF_8));
try {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.debug("第{}个消息投递成功,消息号为{}",index,sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
ConsumerHandler handler = new ConsumerHandler();
handler.handle("async","asyncMessage","");
// producer.shutdown();
}
}
- 同步可靠消息
package com.automannn.demo.mq.rocket.syncMsg;
import com.automannn.demo.mq.rocket.common.ConsumerHandler;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/14
* @Description 可靠同步传输,可用于广泛的场景,如重要消息通知,短信通知,短信营销系统等
*/
@Slf4j
public class SyncMsgTest {
public static void main(String[] args) {
DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
producer.setProducerGroup("sync");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i=0;i<100;i++){
Message msg = new Message("syncTopic","TagA",("hello,Automannn.Sync msg: "+i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = null;
try {
sendResult = producer.send(msg);
} catch (Exception e) {
e.printStackTrace();
}
log.debug("send success"+sendResult);
}
ConsumerHandler handler = new ConsumerHandler();
handler.handle("sync","syncTopic","");
// producer.shutdown();
}
}
- 延迟消息
package com.automannn.demo.mq.rocket.scheduleMsg;
import com.automannn.demo.mq.rocket.common.ConsumerHandler;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/14
*/
@Slf4j
public class ScheduleMsgTest {
public static void main(String[] args) {
DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
producer.setProducerGroup("schedule");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i=0;i<15;i++){
long time = System.currentTimeMillis();
Message msg = new Message("scheduleTopic",("hello,automannn.scheduleMsg:"+time).getBytes(StandardCharsets.UTF_8));
//延时级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(7);
try {
SendResult sendResult = producer.send(msg);
log.debug("send success"+sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
ConsumerHandler handler = new ConsumerHandler();
handler.handle("schedule","scheduleTopic","");
//producer.shutdown();
}
}
- 过滤消息
package com.automannn.demo.mq.rocket.filterMsg;
import com.automannn.demo.mq.rocket.common.RocketMqConsumerFactory;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/14
*/
@Slf4j
public class FilterMsgTest {
public static void main(String[] args) {
DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
producer.setProducerGroup("filter");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i=0;i<10;i++){
Message message = new Message("filterTopic","tagA",("hello,filter-"+i).getBytes(StandardCharsets.UTF_8));
message.putUserProperty("a",String.valueOf(i));
try {
SendResult sendResult = producer.send(message);
log.debug("send success: "+ sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
DefaultMQPushConsumer consumer = RocketMqConsumerFactory.getPushConsumer();
consumer.setConsumerGroup("filter");
//当消息中有 a属性,且满足限制条件,才消费
try {
consumer.subscribe("filterTopic", MessageSelector.byTag("tagA"));
//consumer.subscribe("filterTopic", MessageSelector.bySql("a >= 0 and a<= 3"));
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs,context)->{
log.debug("receive msg: "+msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
// consumer.shutdown();
// producer.shutdown();
}
}
- 有序消息
package com.automannn.demo.mq.rocket.orderMsg;
import com.automannn.demo.mq.rocket.common.RocketMqConsumerFactory;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author automannn
* @Date 2022/4/14
*/
@Slf4j
public class OrderMsgTest {
public static void main(String[] args) {
DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
producer.setProducerGroup("order");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i=0;i<20;i++){
//根据业务进行定义,相同的id ,会有序分到一个 queue
int virtualId = i%2;
long time = System.currentTimeMillis();
String[] tags = new String[]{"tagA","tagB"};
Message message = new Message("orderTopic",tags[i%tags.length],"key"+i,("hello,automannn. orderMsg "+time).getBytes(StandardCharsets.UTF_8));
try {
SendResult sendResult = producer.send(message,(MessageQueueSelector)(mqs,msg,arg)->{
Integer id = (Integer) arg;
int index = id%mqs.size();
return mqs.get(index);
}, virtualId);
log.debug("send success: "+sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
DefaultMQPushConsumer consumer = RocketMqConsumerFactory.getPushConsumer();
consumer.setConsumerGroup("order");
try {
consumer.subscribe("orderTopic","tagA || tagB");
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
consumeOrderlyContext.setAutoCommit(true);
log.debug("msgs size = "+ list.size());
for (MessageExt messageExt: list){
log.debug("receive new Message: "+messageExt);
log.debug(new String(messageExt.getBody()));
}
this.consumeTimes.incrementAndGet();
log.debug("consume times : {}",consumeTimes.get());
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
consumeOrderlyContext.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
// consumer.shutdown();
// producer.shutdown();
}
}
- 事务消息
package com.automannn.demo.mq.rocket.transactionMsg;
import com.automannn.demo.mq.rocket.common.RocketMqConfigInstant;
import com.automannn.demo.mq.rocket.common.RocketMqConsumerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author automannn
* @Date 2022/4/14
*/
@Slf4j
public class TxMsgTest {
public static void main(String[] args) {
TransactionCheckListener listener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("transaction");
//事务回查最小并发数、最大并发数
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(2);
producer.setNamesrvAddr(RocketMqConfigInstant.NAME_SERVER_ADDRESS);
//队列数
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(listener);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
TransactionExecuterImpl executer = new TransactionExecuterImpl();
for(int i=0;i<4;i++){
Long time = System.currentTimeMillis();
Message msg = new Message("transactionTopic","tagA","KEY"+i,("hello,automann.TransactionMsg-"+i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult= null;
try {
sendResult = producer.sendMessageInTransaction(msg,executer,null);
} catch (MQClientException e) {
e.printStackTrace();
}
log.debug("send success: "+sendResult);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*===========consumer start===============*/
DefaultMQPushConsumer consumer = RocketMqConsumerFactory.getPushConsumer();
consumer.setConsumerGroup("transaction");
try {
consumer.subscribe("transactionTopic","tagA");
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs,context)->{
log.debug("consume success: "+msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
// consumer.shutdown();
// producer.shutdown();
}
//未决事务,服务器回查客户端
static class TransactionCheckListenerImpl implements TransactionCheckListener{
private AtomicInteger transactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
log.debug("server checking txMsg: "+ messageExt);
int value = transactionIndex.getAndIncrement();
if(value==0){
log.error("count not find db");
throw new RuntimeException("could not find db");
}else if(value==1){
return LocalTransactionState.ROLLBACK_MESSAGE;
}else if(value==2){
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
//执行本地事务
static class TransactionExecuterImpl implements LocalTransactionExecuter{
private AtomicInteger transactionIndex = new AtomicInteger(1);
@Override
public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {
int value = transactionIndex.getAndIncrement();
log.debug("value == "+value);
if(value==0){
log.error("could not find db");
throw new RuntimeException("could not find db");
}else if (value==1){
return LocalTransactionState.ROLLBACK_MESSAGE;
}else if (value==2){
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
}
- 广播消息
package com.automannn.demo.mq.rocket.broadcasting;
import com.automannn.demo.mq.rocket.common.ConsumerHandler;
import com.automannn.demo.mq.rocket.common.RocketMqProducerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/14
*/
@Slf4j
public class BroadcastingTest {
public static void main(String[] args) {
ConsumerHandler handler1 = new ConsumerHandler();
ConsumerHandler handler2 = new ConsumerHandler();
handler1.handle("broadcast202204151","broadcastTopic20220415","tagA");
handler2.handle("broadcast202204152","broadcastTopic20220415","tagA");
// handler1.handleBroadCast("broadcast202204151","broadcastTopic20220415","tagA");
// handler2.handleBroadCast("broadcast202204152","broadcastTopic20220415","tagA");
DefaultMQProducer producer = RocketMqProducerFactory.getProducer();
producer.setProducerGroup("broadcast20220415");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
Message message = new Message("broadcastTopic20220415","tagA","KEY1","hello,boradcast Msg.the consumer mast start before producer".getBytes(StandardCharsets.UTF_8));
try {
for (int i=0;i<1;i++){
SendResult sendResult= producer.send(message);
log.debug("send success : "+sendResult );
}
} catch (Exception e) {
e.printStackTrace();
}
// producer.shutdown();
}
}
# kafka
# 概述
- Apache软件基金会开发的开源流处理平台,由
scala
和java
编写. - 高吞吐量的分布式发布订阅消息系统.
- kafka无特别含义,当时的开发架构师喜欢
franz kafka
(一个小说家).
# 发展史
- 2010年左右,
Linkedin
为了解决数据管道问题,由于ActiveMQ
无法满足需求,决定研发自己的消息传递系统; - 于2011年,贡献给Apache基金会,2012年,成为apache的顶级开源项目.
# 发布史
- 0.7,发布于2012.01,是第一个发布版本;
- 1.0,发布于2017.11
- 2.0,发布于2018.07
- 3.0,发布于2021.09
- 3.1,发布于2022.01,是目前的最新版
# kafka术语
- 生产者
- 向broker发布消息的应用程序
- 同时负责选择发布到topic上的哪一个分区
- 消费者
- 从消息队列中,获取消息的客户端程序
- 若干消费者组成一个消费者组,一条消息只能被一个消费者消费;
- 所有消费者都在一个组内,则行成queue模型;所有消费者都在不同组,则形成了发布-订阅模型;
- broker
- kafka实例,多个broker组成一个kafka集群
- 通常一台机器部署一个kafka实例
- topic
- 一组消息的归纳
- 服务器端消息的逻辑存储单元。一个topic通常包含若干个Partition分区
- 分区
- 一个分区可看作一个队列
- 若干个分区,可以被若干个consumer同时消费,达到消费者高吞吐量
- 单个分区有序,整体无序
- message
- 消息,也称日志消息,是kafka服务端实际存储的数据
- 每一条消息都由一个key,一个value,以及时间戳组成
- offset
- 偏移量,分区中的消息位置,由kafka自身维护
- consumer消费时,也要保存一份offset以维护消费的位置信息
# 服务端安装
- 安装及运行
zookeeper
- 配置日志地址,安装及启动
kafka
:.\kafka-server-start.bat ../../config/server.properties
- 验证
- 创建主题:
.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic automannn
- 查询主题:
.\kafka-topics.bat --list --zookeeper localhost:2181
- 启动生产者:
.\kafka-console-producer.bat --broker-list localhost:9092 --topic automannn
- 启动消费者:
.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic automannn --from-beginning
- 查询消费者:
.\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
- 创建主题:
# 客户端
package com.automannn.demo.mq.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Arrays;
import java.util.Properties;
/**
* @author automannn
* @Date 2022/4/15
*/
@Slf4j
public class KafkaTest {
public static void main(String[] args) {
Producer producer = KafkaProducerFactory.getProducer();
for (int i=0;i<10;i++){
producer.send(new ProducerRecord("automannn",String.valueOf(i),"this is msg-"+i));
}
producer.close();
Consumer consumer =KafkaConsumerFactory.getConsumer();
consumer.subscribe(Arrays.asList("automannn"));
while (true){
ConsumerRecords<String,String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
log.debug("consume msg: offset = {},key={},value={}",record.offset(),record.key(),record.value());
}
}
}
static class KafkaProducerFactory{
public static Producer getProducer(){
Properties props = new Properties();
//kafka服务端地址
props.put("bootstrap.servers","localhost:9092");
//判断请求是否成功的条件,all将会阻塞消息,性能最低,但最可靠
props.put("acks","all");
//请求失败,自动重试的重试次数
props.put("retries",0);
//生产者缓存每个分区未发送的信息
props.put("batch.size",16348);
//发送缓存的时间
props.put("linger.ms",1);
//生产者可用的缓存总量
props.put("buffer.memory",33554432);
//键值序列化器
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaProducer(props);
}
}
static class KafkaConsumerFactory{
public static Consumer getConsumer(){
Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092");
//消费者组名称
props.put("group.id","automannnTest");
//偏移量由 auto.commit.interval.ms控制自动提交的频率
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
//序列化器
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer(props);
}
}
}