• 大小: 9KB
    文件类型: .rar
    金币: 2
    下载: 1 次
    发布日期: 2021-06-19
  • 语言: Java
  • 标签: java  Recket  demo  

资源简介

java编写的RocketMQ入门demo,maven 更新依赖,可直接运行Producer和Consumer 简单进行测试

资源截图

代码片段和文件信息

package com.kang.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
 
import java.util.List;
 
public class Consumer {
 
    /**
     * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。

     * 但是实际PushConsumer内部是使用长轮询Pull方式从metaQ服务器拉消息,然后再回调用户Listener方法

     */
    public static void main(String[] args) throws InterruptedException
            MQClientException {
        /**
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例

         * 注意:ConsumerGroupName需要由应用来保证唯一
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                “ConsumerGroupName“);
        consumer.setNamesrvAddr(“127.0.0.1:9876“);
        consumer.setInstanceName(“Consumber“);
 
        /**
         * 订阅指定topic下tags分别等于TagA或TagC或TagD
         */
        consumer.subscribe(“TopicTest1“ “TagA || TagC || TagD“);
        /**
         * 订阅指定topic下所有消息

         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe(“TopicTest2“ “*“);
 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            /**
             * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List msgs ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName()
                        + “ Receive New Messages: “ + msgs.size());
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals(“TopicTest1“)) {
                    // 执行TopicTest1的消费逻辑
                    if (msg.getTags() != null && msg.getTags().equals(“TagA“)) {
                        // 执行TagA的消费
                        System.out.println(new String(msg.getBody()));
                    } else if (msg.getTags() != null
                            && msg.getTags().equals(“TagC“)) {
                        // 执行TagC的消费
                    } else if (msg.getTags() != null
                            && msg.getTags().equals(“TagD“)) {
                        // 执行TagD的消费
                    }
                } else if (msg.getTopic().equals(“TopicTest2“)) {
                    System.out.println(new String(msg.getBody()));
                }
 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可

         */
        consumer.start();
 
        System.out.println(“Consumer Started.“);
    }
}

 属性            大小     日期    时间   名称
----------- ---------  ---------- -----  ----

     文件       1433  2019-03-26 16:29  RocketMQ\.classpath

     文件        560  2019-03-26 15:04  RocketMQ\.project

     文件        191  2019-03-26 16:29  RocketMQ\.settings\org.eclipse.core.resources.prefs

     文件        243  2019-03-26 16:29  RocketMQ\.settings\org.eclipse.jdt.core.prefs

     文件         90  2019-03-26 15:04  RocketMQ\.settings\org.eclipse.m2e.core.prefs

     文件       1485  2019-03-26 16:30  RocketMQ\pom.xml

     文件       3414  2019-03-26 16:33  RocketMQ\src\main\java\com\kang\rocketmq\Consumer.java

     文件       3562  2019-03-26 16:32  RocketMQ\src\main\java\com\kang\rocketmq\Producer.java

     文件       2571  2019-03-26 16:33  RocketMQ\target\classes\com\kang\rocketmq\Consumer$1.class

     文件       1381  2019-03-26 16:33  RocketMQ\target\classes\com\kang\rocketmq\Consumer.class

     文件       2202  2019-03-26 16:32  RocketMQ\target\classes\com\kang\rocketmq\Producer.class

     目录          0  2019-03-26 15:15  RocketMQ\src\main\java\com\kang\rocketmq

     目录          0  2019-03-26 15:10  RocketMQ\src\main\java\com\kang

     目录          0  2019-03-26 16:30  RocketMQ\target\classes\com\kang\rocketmq

     目录          0  2019-03-26 15:10  RocketMQ\src\main\java\com

     目录          0  2019-03-26 16:30  RocketMQ\target\classes\com\kang

     目录          0  2019-03-26 15:10  RocketMQ\src\main\java

     目录          0  2019-03-26 15:04  RocketMQ\src\main\resources

     目录          0  2019-03-26 15:04  RocketMQ\src\test\java

     目录          0  2019-03-26 15:04  RocketMQ\src\test\resources

     目录          0  2019-03-26 16:30  RocketMQ\target\classes\com

     目录          0  2019-03-26 15:04  RocketMQ\src\main

     目录          0  2019-03-26 15:04  RocketMQ\src\test

     目录          0  2019-03-26 16:30  RocketMQ\target\classes

     目录          0  2019-03-26 15:04  RocketMQ\target\test-classes

     目录          0  2019-03-26 16:29  RocketMQ\.settings

     目录          0  2019-03-26 15:04  RocketMQ\src

     目录          0  2019-03-26 15:04  RocketMQ\target

     目录          0  2019-03-26 15:04  RocketMQ

----------- ---------  ---------- -----  ----

............此处省略2个文件信息

评论

共有 条评论