Quick Start
This chapter will demonstrate the basic use of RocketMQ and the basic use of RocketMQ combined with Spring Cloud Stream, taking broadcast consumption as an example.
Basic usage of RocketMQ
Install and start
Download RocketMQ latest binary file, and decompress.
The directory structure after decompression is as follows:
apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── libStart NameServer
Linux / Unix / Mac operating system, execute the command:
```shell
$ nohup sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log
```
Windows operating system, execute the command:
```
$ .\bin\mqnamesrv.cmd
```
- Start Broker
Linux / Unix / Mac operating system, execute the command:
```bash
$ nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log
```
Windows operating system, execute the command:
```
$ .\bin\mqbroker.cmd -n localhost:9876
```
Test sending and receiving messages
- Create a test topic (TopicTest):
Linux / Unix / Mac operating system, execute the command:
```shell
$ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
```
Windows operating system, execute the command:
```
$ .\bin\mqadmin.cmd updatetopic -n localhost:9876 -t TopicTest -c DefaultCluster
```
- Send a message:
Linux / Unix / Mac operating system, execute the command:
```shell
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
```
Windows operating system, execute the command:
```cmd
$ .\bin\tools.cmd org.apache.rocketmq.example.quickstart.Producer
```
After sending successfully, it will display: SendResult [sendStatus=SEND_OK, msgId= ...
Receive message:
Linux / Unix / Mac operating system, execute the command:
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Windows operating system, execute the command:
$ .\bin\tools.cmd org.apache.rocketmq.example.quickstart.Consumer
After sending successfully, it will display: SendResult [sendStatus=SEND_OK, msgId= ...
After receiving successfully, it will display: ConsumeMessageThread_%d Receive New Messages: [MessageExt...
Broadcast consumption example
Broadcasting sends messages to all consumers. If you want all consumers under the same consumer group to receive messages under the same topic, broadcast consumption is very suitable for this scenario.
Introduce dependencies
A starter whose group ID is com.alibaba.cloud and artifact ID is spring-cloud-starter-stream-rocketmq needs to be introduced in the pom.xml file:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
Update topic
Update the topic in RocketMQ to broadcast mode.
Linux / Unix / Mac operating system, execute the command:
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t broadcast
Windows operating system, execute the command:
.\bin\mqadmin.cmd updateTopic -n localhost:9876 -c DefaultCluster -t broadcast
Create producer
- Related configuration
Complete the configuration of the RocketMQ broadcast message producer in the /src/main/resources/application.yaml configuration file of the application. Configure the service port number, the NameServer address of RocketMQ, and the destination for sending messages as follows:
server:
port: 28085
spring:
application:
name: rocketmq-broadcast-producer-example
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
bindings:
producer-out-0:
destination: broadcast
logging:
level:
org.springframework.context.support: debug
- Send a message
Inject the StreamBridge into the application via the@Autowired annotation to send messages. In the producer method, use a loop to send 100 messages. Each message contains a key and headers, and creates a GenericMessage object as the message body, and then uses the streamBridge.send method to send the message to the destination named "producer-out-0".
@SpringBootApplication
public class RocketMQBroadcastProducerApplication {
private static final Logger log = LoggerFactory.getLogger(RocketMQBroadcastProducerApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastProducerApplication.class, args);
}
@Bean
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
}
Create consumer
Start two consumer instances.
consumer 1
- Related configuration
Complete the relevant configuration of RocketMQ broadcast message consumer 1 in the /src/main/resources/application.yaml configuration file of the application. Configure the service port number, consumer function, RocketMQ NameServer address, message consumption mode and consumer group as follows:
server:
port: 28084
spring:
application:
name: rocketmq-broadcast-consumer1-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
consumer-in-0:
consumer:
messageModel: BROADCASTING
bindings:
consumer-in-0:
destination: broadcast
group: broadcast-consumer
logging:
level:
org.springframework.context.support: debug
- Receive message
The consumer method marked with the @Bean annotation returns a Consumer object for processing the received message. In the consumer method, a consumer function is defined through a Lambda expression, and when a new message is received, the message content is printed out.
@SpringBootApplication
public class RocketMQBroadcastConsumer1Application {
private static final Logger log = LoggerFactory.getLogger(RocketMQBroadcastConsumer1Application.class);
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastConsumer1Application.class, args);
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
consumer 2
For a detailed explanation of the creation process of consumer 2, please refer to the explanation of the creation process of consumer 1 above.
- Related configuration
application.yml
configuration
server:
port: 28083
spring:
application:
name: rocketmq-broadcast-consumer2-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
consumer-in-0:
consumer:
messageModel: BROADCASTING
bindings:
consumer-in-0:
destination: broadcast
group: broadcast-consumer
logging:
level:
org.springframework.context.support: debug
- Receive message
@SpringBootApplication
public class RocketMQBroadcastConsumer2Application {
private static final Logger log = LoggerFactory.getLogger(RocketMQBroadcastConsumer2Application.class);
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args);
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
In addition to the above-mentioned broadcast consumption applications, applications such as sequential consumption, delayed consumption, message filtering, and transactional messages can also be performed. For more simple examples of RocketMQ, please refer to RocketMQ example.
This Demo only demonstrates the basic use of RocketMQ combined with Spring Cloud Stream. For more advanced features of RocketMQ, please refer to the RocketMQ Advanced Guide chapter!