1.rabbitmq erlang语言开发,时效性最高
2.rocketmq 吞吐量高,时效性高,实现了事务消息,但在大数据方面需要自己写代码支持
3.kafka 超高的吞吐量,消息较少时可能会有延迟(kafka是堆积一波消息后发送)
1.常用的是topic订阅发布模型
发布订阅又有两种模式
- 集群消费方式 一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务
同一个topic下,不同的consumer均能收到消息,同一个consumer(group)则只能有一个消费者收到消息【默认集群模式下,如果是广播模式则都能消费到】
2.点对点模型
生产者发送的消息,已有一个消费者都收到
当新实例启动的时候,PushConsumer 会拿到本消费组 broker 已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次 Pull 请求。
如果这个消费进度在 Broker 并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:
- CONSUMEFROMLAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息
- CONSUMEFROMFIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在 broker 的)全部消费一遍
- CONSUMEFROMTIMESTAMP:从某个时间点开始消费,和 setConsumeTimestamp() 配合使用,默认是半个小时以前
rocketmq、kafka等mq均有分区的概念。
分区是为了为了性能考虑,如果topic内的消息只存于一个broker,那这个broker会成为瓶颈,无法做到水平扩展。所以把topic内的数据分布到整个集群就是一个自然而然的设计方式。broker的引入就是解决水平扩展问题的一个方案。
生产者发送的时候可以指定一个key选择同一个Queue,则这一批消息的消费将是顺序消息(并由同一个consumer完成消息)
例如:电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
既保证业务的顺序,同时又能保证业务的高性能。
生产者端重试
- 向broker发送消息时,如果由于网络抖动等原因导致消息发送失败,可以设置失败重试次数让消息重发
消费者端重试
- 由于网络等原因导致消息没法从broker发送到消费者端,此时MQ会重试直到发送成功(集群模式)
- 确保消费成功再ack,关闭自动ack设置手动ack。如果消费者端在执行后续消息处理后因为网络原因队列未收到ack,为了保证消息是肯定被至少消费成功一次,RocketMQ 会进行重试,把这批消息在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预
- 自动ack消费者收到消息就会就会ack更新位移
- 手动ack消费者消费完毕成功才会ack更新位移
常用的消息队列都能确保消息到达,但是不能保证唯一性,所以可能存在重复数据
生产者成功发送消息给队列时,队列会返回ack给生产者,但是当网络出现问题,队列成功收到消息,但是ack出现问题。生产者一般会重发消息,所以会导致队列中存在多条重复消息。
此外,如果消费者事务提交,但是返回ack网络出现问题,导致队列未收到ack,那么队列会重复发消息给消费者
保证接口的幂等性
- 乐观锁
- 唯一索引
- 记录每条被消费的消息的状态
事务消息
- 发送方先向 mq 发送一条 prepare 消息,如果 prepare 消息发送失败,则直接取消操作 如果消息发送成功,则执行本地事务
- 如果本地事务执行成功,则想 mq 发送一条 confirm 消息,如果发送失败,则发送回滚消息
- 订阅方定期消费 mq 中的 confirm 消息,执行本地事务,并发送 ack 消息。如果 B 系统中的本地事务失败,会一直不断重试,如果是业务失败,会向 A 系统发起回滚请求
- mq会提供一个消息回查的功能,会定期轮询那些未确认的 prepared 消息检查本地事务,如果该 prepare 消息本地事务处理成功,则重新发送 confirm 消息,否则直接回滚该消息
原因:
消费者消费消息的速度比不上生产者发送消息的速度
解决办法:
1 如果有慢sql慢逻辑等情况,优先修复问题。慢逻辑是否能异步执行
2 增加单节点线程数,或者增加消费者机节点横向扩展)
注:使用多线程来处理消费消息,因为如果某线程异常了不会影响主线程,到最后主线程消费者已ack给队列,消息已被删除,数据就无法恢复了
参考文章: RocketMQ——角色与术语详解 RocketMQ官方文档 分布式消息队列RocketMQ&Kafka – 消息的“顺序消费”– 一个看似简单的复杂问题 RocketMQ & Kafka 消息消费与消息重试 分布式事务,这一篇就够了