Written by
🍺
on August 2, 2023
Springboot运行时动态创建队列消费者
原理
核心类是AmqpAdmin
以及其实现类RabbitAdmin
,这个类包含了注册queue、exchange以及binding的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Declare the given queue.
* @param queue the queue to declare.
* @return the name of the queue.
*/
@Nullable
String declareQueue ( Queue queue);
/**
* Declare an exchange.
* @param exchange the exchange to declare.
*/
void declareExchange ( Exchange exchange);
/**
* Declare a binding of a queue to an exchange.
* @param binding a description of the binding to declare.
*/
void declareBinding ( Binding binding);
我们只需要注入该类到相应的Service中,即可利用这几个方法实现运行时动态注册queue,动态绑定queue和exchange;
这里要注意一个问题:注入bean的时候要用AmqpAdmin ,不能用RabbitAdmin,否则注不进去。因为rabbitMq自动配置生成的地方用的是AmqpAdmin:
1
2
3
public AmqpAdmin amqpAdmin ( ConnectionFactory connectionFactory) {
return new RabbitAdmin( connectionFactory);
}
完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Service
@Slf4j
@RequiredArgsConstructor
public class MqFactory {
private final SimpleRabbitListenerContainerFactory containerFactory;
private final AmqpAdmin rabbitAdmin;
private final List< MessageListenerContainer> messageListenerContainerList = new ArrayList<>();
public void createMessageListener ( String tableName) {
log. info ( "- create message listener for model: {}" , tableName);
//生成队列、交换机和绑定规则,绑定key为 = data.sync.key.表名称
Queue queue = new Queue( "" );
TopicExchange exchange = new TopicExchange( EXCHANGE);
Binding binding = BindingBuilder. bind ( queue). to ( exchange). with ( KEY_PREFIX + tableName);
rabbitAdmin. declareQueue ( queue);
rabbitAdmin. declareExchange ( exchange);
rabbitAdmin. declareBinding ( binding);
SimpleMessageListenerContainer container = containerFactory. createListenerContainer ();
container. setQueues ( queue);
container. setMessageListener ( message -> handleMessage( tableName, message));
container. start ();
messageListenerContainerList. add ( container);
log. info ( "- create message listener finish" );
}
private void handleMessage ( String tableName, Message message) {
byte [] body = message. getBody ();
String json = new String( body);
log. info ( "- handle table: {} message: {}" , tableName, json);
}
@PreDestroy
public void destroy () {
messageListenerContainerList. forEach ( Lifecycle:: stop);
log. info ( "- stop all message listeners..." );
}
}