博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ之Windows下安装及发送接收实例
阅读量:5881 次
发布时间:2019-06-19

本文共 3121 字,大约阅读时间需要 10 分钟。

软件环境:

JDK1.8

alibaba-rocketmq-3.2.6.tar.gz

 

本文安装和测试都在同一台电脑中进行,所有IP都是127.0.0.1,

其中NameServer一个,Broker一个,Producer一个,Consumer一个。

 

下载编译好的RocketMQ的发布版,也可自行编译source code

https://github.com/alibaba/RocketMQ/releases

 

启动name server

进入解压的bin目录 双击mqnamesrv.exe,启动nameserver,保持mqnamesrv.exe运行,不要关闭这个终端。

 

启动broker

进入解压的bin目录,打开一个终端执行 mqbroker.exe -n localhost:9876 autoCreateTopicEnable=true

 

启动顺序必须是先启动nameserver,然后启动broker

 

以通过jps查看一下是不是有了RocketMQ的进程

jps -v 

 

Java发送和接收测试

 

增加maven依赖配置

com.alibaba.rocketmq
rocketmq-client
3.2.6

 

package com.zns.rocketmq;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;public class ProducerTest {    public static void main(String[] args) throws Exception {        DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");        producer.setNamesrvAddr("127.0.0.1:9876");        producer.setInstanceName("instance1");        producer.start();        System.out.println("开始发送数据");        try {            for (int i = 0; i < 3; i++) {                Message msg = new Message("mytopic", "mytag", ("hello world " + i).getBytes());                SendResult sendResult = producer.send(msg);                System.out.println("发送成功 " + new String(msg.getBody()));            }        } catch (Exception e) {            e.printStackTrace();        }        producer.shutdown();    }}

 

package com.zns.rocketmq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;public class ConsumerTest {    public static void main(String[] args) {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producerGroup1");        consumer.setNamesrvAddr("127.0.0.1:9876");        System.out.println("开始接收数据");        try {            // 设置topic和标签            consumer.subscribe("mytopic", "mytag");            // 程序第一次启动从消息队列头取数据            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);            consumer.registerMessageListener(new MessageListenerConcurrently() {                public ConsumeConcurrentlyStatus consumeMessage(List
list, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); System.out.println("收到数据:" + new String(msg.getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } }}

 

先运行生产者,再运行消费者,测试成功!

转载于:https://www.cnblogs.com/zengnansheng/p/10389758.html

你可能感兴趣的文章
截取字符串中两个字符串中的字符串
查看>>
spring xml properties split with comma for list
查看>>
判断点是否在三角形内
查看>>
Android实战简易教程-第二十三枪(基于Baas的用户注冊验证username是否反复功能!)...
查看>>
在odl中怎样实现rpc
查看>>
leetcode 110 Balanced Binary Tree
查看>>
python活用isdigit方法显示系统进程
查看>>
项目开发总结
查看>>
知行合一
查看>>
jmeter插件之jsonpath提取响应结果和做断言
查看>>
发布支持多线程的PowerShell模块 —— MultiThreadTaskRunner
查看>>
Ubuntu ctrl+alt会导致窗口还原的问题
查看>>
第四十期百度技术沙龙笔记整理
查看>>
推荐系统那点事 —— 基于Spark MLlib的特征选择
查看>>
linux 下RTL8723/RTL8188调试记录(命令行)【转】
查看>>
SpringMVC案例1——对User表进行CRUD操作
查看>>
[Contiki系列论文之1]Contiki——为微传感器网络而生的轻量级的、灵活的操作系统...
查看>>
Android 网络编程 记录
查看>>
微软同步发行Windows 10和Windows 10 Mobile系统更新
查看>>
Zeppelin的入门使用系列之使用Zeppelin运行shell命令(二)
查看>>