首先下载rocketmq,启动需要指定rocketmq home目录
cd github
git clone -b develop https://github.com/apache/incubator-rocketmq.git
whatsmars-mq |-src |-main |-java |-com.itlong.whatsmars.mq.rocketmq.quickstart BrokerStartup.java Consumer.java NamesrvStartup.java Producer.java |-resource conf.properties pom.xml
依赖:
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-namesrv --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-namesrv</artifactId> <version>4.0.0-incubating</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-broker --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-broker</artifactId> <version>4.0.0-incubating</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.0.0-incubating</version> </dependency> </dependencies>
conf.properties
rocketmqHome=D:\\github\\incubator-rocketmq\\distribution namesrvAddr=127.0.0.1:9876 mapedFileSizeCommitLog=52428800 mapedFileSizeConsumeQueue=30000
类似于zookeeper的服务:
public class NamesrvStartup { public static void main(String[] args) { String classpath = BrokerStartup.class.getResource("/").getPath(); args = new String[] {"-c", classpath + "conf.properties"}; org.apache.rocketmq.namesrv.NamesrvStartup.main(args); } }
Broker:
public class BrokerStartup { public static void main(String[] args) { String classpath = BrokerStartup.class.getResource("/").getPath(); args = new String[] {"-c", classpath + "conf.properties"}; org.apache.rocketmq.broker.BrokerStartup.main(args); System.out.println("Broker started."); } }
Consumer:
package com.itlong.whatsmars.mq.rocketmq.quickstart; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}. */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { /* * Instantiate with specified consumer group name. */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ consumer.setNamesrvAddr("127.0.0.1:9876"); /* * Specify where to start in case the specified consumer group is a brand new one. */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /* * Subscribe one more more topics to consume. */ consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start(); System.out.printf("Consumer Started.%n"); } }
Producer:
package com.itlong.whatsmars.mq.rocketmq.quickstart; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}. */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /* * Instantiate with a producer group name. */ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ producer.setNamesrvAddr("127.0.0.1:9876"); /* * Launch the instance. */ producer.start(); for (int i = 0; i < 1000; i++) { try { /* * Create a message instance, specifying topic, tag and message body. */ Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); /* * Call send message to deliver message to one of brokers. */ SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } /* * Shut down once the producer instance is not longer in use. */ producer.shutdown(); } }
conf.properties指定rocketmqHome,namesrvAddr等,依次启动NamesrvStartup,BrokerStartup,Consumer,Producer.
消息管理系统 https://github.com/javahongxi/whatsmars/tree/master/rocketmq-console
rocketmq原理与实践 http://wely.iteye.com/blog/2392089
相关推荐
RocketMQ原生API使用 SpringBoot整合RocketMQ SpringCloudStream整合RocketMQ
springboot 整合 rocketmq实例代码
RocketMQ消息队列demo,输入IP端口,就可以对消息队列进行操作,发布消息,订阅消息。
rocketMQ demo案例 详细文档:...
初学者可以来学习一下,rocketMQ的简单的小demo 简单易懂
rocketmq集成至springmvc,rocketmq快速上手,快速集成至原有项目进行开发
Apache RocketMQ入门demo,用来理解RocketMQ的基本原理,附带必要代码注释。基于maven的用Java api编写的Producer和Consumer;
阿里RocketMQ资料,包括整体设计、使用注意点经验总结等等
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
java编写的RocketMQ入门demo,maven 更新依赖,可直接运行Producer和Consumer 简单进行测试
spring-boot操作rocketmq的demo,亲测可用,代码整理的好
rocketmq-demo
很全的rocket包及安装详细说明附加demo示例。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。...
java 使用 rocketmq的一个生产者和消费者的实现,其中要先启动rocket的nameserver 和borker
rocketmq-demo:最终一致性分布式事务-rocketmq使用
近段时间对RocketMQ队列的一个整理,收集绝大部分网上相关的资料,以及一个demo的实现
springboot1.5.10.RELEASE集成rocketmq4.3.1消息服务demo,多个消费者多监听
基于java springboot的rocketmq生产和消费demo,开发工具为idea
rocketmq监控 查看rocketmq.namesrv对应下的broker、topic、consuemr/producer等