查看: 164|回复: 0

    【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)

    [复制链接]

    248

    主题

    248

    帖子

    595

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    595
    发表于 2022-4-24 09:47:23 | 显示全部楼层 |阅读模式
    【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)
    1.引入依赖
    1. <dependency>
    2.             <groupId>org.apache.rocketmq</groupId>
    3.             <artifactId>rocketmq-client</artifactId>
    4.             <version>4.3.2</version>
    5.         </dependency>
    6.         <dependency>
    7.             <groupId>org.apache.rocketmq</groupId>
    8.             <artifactId>rocketmq-spring-boot-starter</artifactId>
    9.             <version>2.0.3</version>
    10.         </dependency>
    复制代码
    2.Producer
    public class MyProducer {

        public static void main(String[] args) throws Exception{
            // 构造Producer时,必须指定groupId
            DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
            //指定NameServer的地址  只用namesrv的地址就行,它会从namesrv上拿到broker的地址和topic信息
            producer.setNamesrvAddr("localhost:9876");
            //启动生产者
            producer.start();

            int num = 0;
            while (num < 20) {
                num++;
                 /**
                  * rocketmq封装了Message
                       * String topic,
                       * String tags, 标签(分类)---> 筛选
                       * byte[] body
                       */
                Message message = new Message("my_test_topic", "", ("hello rocketmq:" + num).getBytes());
                //同步发送  发送消息,拿到返回SendResult
                SendResult result = producer.send(message);
                System.out.println(result);
            }
            //关闭生产者
            producer.shutdown();
        }
    }
    1. <blockquote>public class MyProducer {
    复制代码

    启动并发送消成功后,返回的SendResult如下:
    003.png
    SendResult中,有一个sendStatus状态,表示消息的发送状态。一共有四种状态

    FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要Broker 的刷盘策Ill创立设置成 SYNC_FLUSH 才会报这个错误)
    FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且Broker 被设置成SYNC_MASTER 方式,没有在设定时间内完成主从同步。
    SLAVE_NOT_AVAILABLE : 这个状态产生的场景和FLUSH_SLAVE_TIMEOUT 类似, 表示在主备方式下,并且Broker 被设置成SYNC_MASTER ,但是没有找到被配置成Slave 的Broker 。
    SEND OK :表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave 上?消息在Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND OK


    001.png
    3.Consumer

    1. public class MyConsumer {

    2.     public static void main(String[] args) throws MQClientException {
    3.         // 构造Consumer时,必须指定groupId
    4.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
    5.         consumer.setNamesrvAddr("localhost:9876"); // nameServer地址,用于获取broker、topic信息
    6.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    7.         // 指定订阅的主题与tag,通过tag可以定制性消费(*表示全部tag)
    8.         consumer.subscribe("my_test_topic", "*");
    9.                
    10.         // 异步消费
    11.         consumer.registerMessageListener(new MessageListenerConcurrently() {
    12.             @Override
    13.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    14.                                                             ConsumeConcurrentlyContext context) {
    15. //                System.out.println("Receive Message:" + msgs.toString());
    16.                     // 1 try catch(throwable)确保不会因为业务逻辑的异常,导致消息出现重复消费的现象
    17.                 // 2 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()中会对Throwable进行捕获,
    18.                 //   并且返回ConsumeConcurrentlyStatus.RECONSUME_LATER
    19.                     try {
    20.                             for(MessageExt msg:msgs){
    21.                                     String msgbody = new String(msg.getBody(), "utf-8");
    22.                                     System.out.println(" MessageBody: "+ msgbody);//输出消息内容
    23.                             }
    24.                     } catch (Exception e) {
    25.                             e.printStackTrace();
    26.                             return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
    27.                     }
    28.                     
    29.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 签收
    30.             }
    31.         });
    32.         //启动消费者
    33.         consumer.start();
    34.         System.out.println("消费者启动成功。。。");
    35.     }
    36. }
    复制代码
    收到消息的内容:

    004.png

    consumer Group:位于同一个consumer Group中的consumer实例

    和producer Group中的各个produer实例承担的角色类似
    同一个group中可以配置多个consumer,可以提高消费端的并发消费能力以及容灾
    和kafka一样,多个consumer会对消息做负载均衡,意味着同一个topic下的不同messageQueue会分发给同一个group中的不同consumer。
    同时,如果我们希望消息能够达到广播的目的,那么只需要把consumer加入到不同的group就行。
    002.png






    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    免责声明:
    1、转载或引用本网站内容须注明原网址,并标明本网站网址“源码资源网”
    2、转载或引用本网站中的署名文章,请按规定向原作者支付稿酬
    3、对于不当转载或引用本网站内容而引起的民事纷争、行政处理或其他损失,本网站不承担责任
    4、对不遵守本声明或其他违法、恶意使用本网站内容者,本网站保留追究其法律责任的权利
    5、资源部分来自网络,不保证资源的完整性,仅供学习研究,如需运营请购买正版,如有侵权请联系客服删除
    6、本站所有资源不带技术支持,下载资源请24小时内删除,如用于违法用途,或者商业用途,一律用于者承担

    QQ|手机版|小黑屋|源码资源网 ( 鲁ICP备2021043233号-3 )

    GMT+8, 2022-8-18 22:08

    邮箱:312337667@qq.com 客服QQ:312337667(工作时间:7:00~23:00)

    © Powered by Net188.com X3.4

    快速回复 返回顶部 返回列表