Written by
🍺
on August 2, 2023
Springboot运行时动态创建队列消费者
原理
核心类是AmqpAdmin
以及其实现类RabbitAdmin
,这个类包含了注册queue、exchange以及binding的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Declare the given queue.
* @param queue the queue to declare.
* @return the name of the queue.
*/
@Nullable
String declareQueue (Queue queue);
/**
* Declare an exchange.
* @param exchange the exchange to declare.
*/
void declareExchange (Exchange exchange);
/**
* Declare a binding of a queue to an exchange.
* @param binding a description of the binding to declare.
*/
void declareBinding (Binding binding);
我们只需要注入该类到相应的Service中,即可利用这几个方法实现运行时动态注册queue,动态绑定queue和exchange;
这里要注意一个问题:注入bean的时候要用AmqpAdmin ,不能用RabbitAdmin,否则注不进去。因为rabbitMq自动配置生成的地方用的是AmqpAdmin:
1
2
3
public AmqpAdmin amqpAdmin (ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Service
@Slf4j
@RequiredArgsConstructor
public class MqFactory {
private final SimpleRabbitListenerContainerFactory containerFactory;
private final AmqpAdmin rabbitAdmin;
private final List< MessageListenerContainer> messageListenerContainerList = new ArrayList<> ();
public void createMessageListener (String tableName) {
log.info ("- create message listener for model: {}" , tableName);
//生成队列、交换机和绑定规则,绑定key为 = data.sync.key.表名称
Queue queue = new Queue("" );
TopicExchange exchange = new TopicExchange(EXCHANGE);
Binding binding = BindingBuilder.bind (queue).to (exchange).with (KEY_PREFIX + tableName);
rabbitAdmin.declareQueue (queue);
rabbitAdmin.declareExchange (exchange);
rabbitAdmin.declareBinding (binding);
SimpleMessageListenerContainer container = containerFactory.createListenerContainer ();
container.setQueues (queue);
container.setMessageListener (message -> handleMessage(tableName, message));
container.start ();
messageListenerContainerList.add (container);
log.info ("- create message listener finish" );
}
private void handleMessage (String tableName, Message message) {
byte [] body = message.getBody ();
String json = new String(body);
log.info ("- handle table: {} message: {}" , tableName, json);
}
@PreDestroy
public void destroy () {
messageListenerContainerList.forEach (Lifecycle::stop);
log.info ("- stop all message listeners..." );
}
}