依星源码资源网,依星资源网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

【好消息,好消息,好消息】VIP会员可以发表文章赚积分啦 !
查看: 1398|回复: 0

【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)

[复制链接] 主动推送

1万

主题

1万

帖子

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
12981
发表于 2022-4-24 09:47:23 | 显示全部楼层 |阅读模式
【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)
1.引入依赖
  1. <dependency>
  2.             <groupId>org.apache.rocketmq</groupId>
  3.             <artifactId>rocketmq-client</artifactId>
  4.             <version>4.3.2</version>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>org.apache.rocketmq</groupId>
  8.             <artifactId>rocketmq-spring-boot-starter</artifactId>
  9.             <version>2.0.3</version>
  10.         </dependency>
复制代码
2.Producer
public class MyProducer {

    public static void main(String[] args) throws Exception{
        // 构造Producer时,必须指定groupId
        DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
        //指定NameServer的地址  只用namesrv的地址就行,它会从namesrv上拿到broker的地址和topic信息
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();

        int num = 0;
        while (num < 20) {
            num++;
             /**
              * rocketmq封装了Message
                   * String topic,
                   * String tags, 标签(分类)---> 筛选
                   * byte[] body
                   */
            Message message = new Message("my_test_topic", "", ("hello rocketmq:" + num).getBytes());
            //同步发送  发送消息,拿到返回SendResult
            SendResult result = producer.send(message);
            System.out.println(result);
        }
        //关闭生产者
        producer.shutdown();
    }
}
  1. <blockquote>public class MyProducer {
复制代码

启动并发送消成功后,返回的SendResult如下:
003.png
SendResult中,有一个sendStatus状态,表示消息的发送状态。一共有四种状态

FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要Broker 的刷盘策Ill创立设置成 SYNC_FLUSH 才会报这个错误)
FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且Broker 被设置成SYNC_MASTER 方式,没有在设定时间内完成主从同步。
SLAVE_NOT_AVAILABLE : 这个状态产生的场景和FLUSH_SLAVE_TIMEOUT 类似, 表示在主备方式下,并且Broker 被设置成SYNC_MASTER ,但是没有找到被配置成Slave 的Broker 。
SEND OK :表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave 上?消息在Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND OK


001.png
3.Consumer

  1. public class MyConsumer {

  2.     public static void main(String[] args) throws MQClientException {
  3.         // 构造Consumer时,必须指定groupId
  4.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
  5.         consumer.setNamesrvAddr("localhost:9876"); // nameServer地址,用于获取broker、topic信息
  6.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  7.         // 指定订阅的主题与tag,通过tag可以定制性消费(*表示全部tag)
  8.         consumer.subscribe("my_test_topic", "*");
  9.                
  10.         // 异步消费
  11.         consumer.registerMessageListener(new MessageListenerConcurrently() {
  12.             @Override
  13.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  14.                                                             ConsumeConcurrentlyContext context) {
  15. //                System.out.println("Receive Message:" + msgs.toString());
  16.                     // 1 try catch(throwable)确保不会因为业务逻辑的异常,导致消息出现重复消费的现象
  17.                 // 2 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()中会对Throwable进行捕获,
  18.                 //   并且返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  19.                     try {
  20.                             for(MessageExt msg:msgs){
  21.                                     String msgbody = new String(msg.getBody(), "utf-8");
  22.                                     System.out.println(" MessageBody: "+ msgbody);//输出消息内容
  23.                             }
  24.                     } catch (Exception e) {
  25.                             e.printStackTrace();
  26.                             return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
  27.                     }
  28.                     
  29.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 签收
  30.             }
  31.         });
  32.         //启动消费者
  33.         consumer.start();
  34.         System.out.println("消费者启动成功。。。");
  35.     }
  36. }
复制代码
收到消息的内容:

004.png

consumer Group:位于同一个consumer Group中的consumer实例

和producer Group中的各个produer实例承担的角色类似
同一个group中可以配置多个consumer,可以提高消费端的并发消费能力以及容灾
和kafka一样,多个consumer会对消息做负载均衡,意味着同一个topic下的不同messageQueue会分发给同一个group中的不同consumer。
同时,如果我们希望消息能够达到广播的目的,那么只需要把consumer加入到不同的group就行。
002.png






扫码关注微信公众号,及时获取最新资源信息!下载附件优惠VIP会员5折;永久VIP免费
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

免责声明:
1、本站提供的所有资源仅供参考学习使用,版权归原著所有,禁止下载本站资源参与商业和非法行为,请在24小时之内自行删除!
2、本站所有内容均由互联网收集整理、网友上传,并且以计算机技术研究交流为目的,仅供大家参考、学习,请勿任何商业目的与商业用途。
3、若您需要商业运营或用于其他商业活动,请您购买正版授权并合法使用。
4、论坛的所有内容都不保证其准确性,完整性,有效性,由于源码具有复制性,一经售出,概不退换。阅读本站内容因误导等因素而造成的损失本站不承担连带责任。
5、用户使用本网站必须遵守适用的法律法规,对于用户违法使用本站非法运营而引起的一切责任,由用户自行承担
6、本站所有资源来自互联网转载,版权归原著所有,用户访问和使用本站的条件是必须接受本站“免责声明”,如果不遵守,请勿访问或使用本网站
7、本站使用者因为违反本声明的规定而触犯中华人民共和国法律的,一切后果自己负责,本站不承担任何责任。
8、凡以任何方式登陆本网站或直接、间接使用本网站资料者,视为自愿接受本网站声明的约束。
9、本站以《2013 中华人民共和国计算机软件保护条例》第二章 “软件著作权” 第十七条为原则:为了学习和研究软件内含的设计思想和原理,通过安装、显示、传输或者存储软件等方式使用软件的,可以不经软件著作权人许可,不向其支付报酬。若有学员需要商用本站资源,请务必联系版权方购买正版授权!
10、本网站如无意中侵犯了某个企业或个人的知识产权,请来信【站长信箱312337667@qq.com】告之,本站将立即删除。
郑重声明:
本站所有资源仅供用户本地电脑学习源代码的内含设计思想和原理,禁止任何其他用途!
本站所有资源、教程来自互联网转载,仅供学习交流,不得商业运营资源,不确保资源完整性,图片和资源仅供参考,不提供任何技术服务。
本站资源仅供本地编辑研究学习参考,禁止未经资源商正版授权参与任何商业行为,违法行为!如需商业请购买各资源商正版授权
本站仅收集资源,提供用户自学研究使用,本站不存在私自接受协助用户架设游戏或资源,非法运营资源行为。
 
在线客服
点击这里给我发消息 点击这里给我发消息 点击这里给我发消息
售前咨询热线
312337667

微信扫一扫,私享最新原创实用干货

QQ|免责声明|小黑屋|依星资源网 ( 鲁ICP备2021043233号-3 )|网站地图

GMT+8, 2024-12-27 23:50

Powered by Net188.com X3.4

邮箱:312337667@qq.com 客服QQ:312337667(工作时间:9:00~21:00)

快速回复 返回顶部 返回列表