软件环境:
JDK1.8
alibaba-rocketmq-3.2.6.tar.gz
本文安装和测试都在同一台电脑中进行,所有IP都是127.0.0.1,
其中NameServer一个,Broker一个,Producer一个,Consumer一个。
下载编译好的RocketMQ的发布版,也可自行编译source code
https://github.com/alibaba/RocketMQ/releases
启动name server
进入解压的bin目录 双击mqnamesrv.exe,启动nameserver,保持mqnamesrv.exe运行,不要关闭这个终端。
启动broker
进入解压的bin目录,打开一个终端执行 mqbroker.exe -n localhost:9876 autoCreateTopicEnable=true
启动顺序必须是先启动nameserver,然后启动broker
以通过jps查看一下是不是有了RocketMQ的进程
jps -v
Java发送和接收测试
增加maven依赖配置
com.alibaba.rocketmq rocketmq-client 3.2.6
package com.zns.rocketmq;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;public class ProducerTest { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("producerGroup1"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("instance1"); producer.start(); System.out.println("开始发送数据"); try { for (int i = 0; i < 3; i++) { Message msg = new Message("mytopic", "mytag", ("hello world " + i).getBytes()); SendResult sendResult = producer.send(msg); System.out.println("发送成功 " + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); }}
package com.zns.rocketmq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;public class ConsumerTest { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producerGroup1"); consumer.setNamesrvAddr("127.0.0.1:9876"); System.out.println("开始接收数据"); try { // 设置topic和标签 consumer.subscribe("mytopic", "mytag"); // 程序第一次启动从消息队列头取数据 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); System.out.println("收到数据:" + new String(msg.getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } }}
先运行生产者,再运行消费者,测试成功!