Skip to content

Commit cdc45d3

Browse files
committed
动态修改topics
1 parent 89cf25e commit cdc45d3

File tree

7 files changed

+165
-0
lines changed

7 files changed

+165
-0
lines changed

springboot-mq-idempotent-consume/ReadME.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,9 @@
77
- 3、如果该ID已经被消费过,则不进行处理
88
- 4、如果该ID没被消费过,则进行处理,处理结束后,把该ID存入redis,且value设置为已消费
99

10+
其他额外功能
11+
12+
- 通过生产者拦截器,改写topics
13+
- 通过动态修改注解参数,改写@kafkaListener的topics
14+
1015
> 注:消费端必须开启手动确认,不能是自动确认

springboot-mq-idempotent-consume/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@
2424
<groupId>org.springframework.kafka</groupId>
2525
<artifactId>spring-kafka</artifactId>
2626
</dependency>
27+
<dependency>
28+
<groupId>com.alibaba</groupId>
29+
<artifactId>fastjson</artifactId>
30+
</dependency>
31+
32+
<dependency>
33+
<groupId>org.reflections</groupId>
34+
<artifactId>reflections</artifactId>
35+
</dependency>
2736

2837
</dependencies>
2938

springboot-mq-idempotent-consume/src/main/java/com/github/lybgeek/kafka/config/KakfaConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.github.lybgeek.kafka.config;
22

33

4+
import com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor;
45
import com.github.lybgeek.kafka.serialization.ObjectDeserializer;
56
import com.github.lybgeek.kafka.serialization.ObjectSerializer;
67
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -19,7 +20,9 @@
1920
import org.springframework.kafka.core.*;
2021
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
2122

23+
import java.util.ArrayList;
2224
import java.util.HashMap;
25+
import java.util.List;
2326
import java.util.Map;
2427

2528

@@ -82,6 +85,10 @@ private Map<String, Object> producerConfigs() {
8285
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
8386
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
8487
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class);
88+
//定义拦截器
89+
List<String> interceptors = new ArrayList<>();
90+
interceptors.add(KafkaProducerInterceptor.class.getName());
91+
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
8592
return props;
8693
}
8794

springboot-mq-idempotent-consume/src/main/java/com/github/lybgeek/kafka/constant/Constant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@
33

44
public class Constant {
55
public static final String TOPIC = "msgTopic";
6+
7+
public static final String TOPIC_KEY_PREFIX = "test_";
68
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package com.github.lybgeek.kafka.consumer.factory;
2+
3+
import com.github.lybgeek.kafka.constant.Constant;
4+
import com.google.common.collect.Lists;
5+
import lombok.SneakyThrows;
6+
import org.reflections.Reflections;
7+
import org.reflections.scanners.FieldAnnotationsScanner;
8+
import org.reflections.scanners.MethodAnnotationsScanner;
9+
import org.reflections.scanners.MethodParameterScanner;
10+
import org.reflections.scanners.SubTypesScanner;
11+
import org.reflections.util.ConfigurationBuilder;
12+
import org.springframework.beans.BeansException;
13+
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
14+
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
15+
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
16+
import org.springframework.kafka.annotation.KafkaListener;
17+
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
18+
import org.springframework.stereotype.Component;
19+
import org.springframework.util.CollectionUtils;
20+
21+
import java.lang.reflect.Field;
22+
import java.lang.reflect.InvocationHandler;
23+
import java.lang.reflect.Method;
24+
import java.lang.reflect.Proxy;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
29+
/**
30+
* {@link KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization(java.lang.Object, java.lang.String)}
31+
*/
32+
@Component
33+
public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {
34+
35+
@SneakyThrows
36+
@Override
37+
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
38+
39+
List<String> packageNames = AutoConfigurationPackages.get(beanFactory);
40+
41+
for (String packageName : packageNames) {
42+
Reflections reflections = new Reflections(new ConfigurationBuilder()
43+
.forPackages(packageName) // 指定路径URL
44+
.addScanners(new SubTypesScanner()) // 添加子类扫描工具
45+
.addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具
46+
.addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具
47+
.addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具
48+
);
49+
50+
Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
51+
if(!CollectionUtils.isEmpty(methodSet)){
52+
for (Method method : methodSet) {
53+
KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
54+
changeTopics(kafkaListener);
55+
}
56+
}
57+
}
58+
59+
}
60+
61+
62+
private void changeTopics(KafkaListener kafkaListener) throws Exception{
63+
InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
64+
Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
65+
memberValuesField.setAccessible(true);
66+
Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler);
67+
String[] topics = (String[])memberValues.get("topics");
68+
System.out.println("修改前topics:" + Lists.newArrayList(topics));
69+
for (int i = 0; i < topics.length; i++) {
70+
topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];
71+
}
72+
memberValues.put("topics", topics);
73+
System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics()));
74+
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.github.lybgeek.kafka.producer.interceptor;
2+
3+
import com.alibaba.fastjson.JSON;
4+
import com.github.lybgeek.kafka.dto.MessageDTO;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.apache.kafka.clients.producer.ProducerInterceptor;
7+
import org.apache.kafka.clients.producer.ProducerRecord;
8+
import org.apache.kafka.clients.producer.RecordMetadata;
9+
10+
import java.util.Map;
11+
12+
import static com.github.lybgeek.kafka.constant.Constant.TOPIC_KEY_PREFIX;
13+
14+
15+
16+
@Slf4j
17+
public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> {
18+
19+
20+
21+
/**
22+
* 运行在用户主线程中,在消息被序列化之前调用
23+
* @param record
24+
* @return
25+
*/
26+
@Override
27+
public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) {
28+
log.info("onSend,orginalRecord:{}", JSON.toJSONString(record));
29+
return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(),
30+
record.partition(),record.timestamp(),record.key(), record.value());
31+
}
32+
33+
34+
35+
36+
/**
37+
* 在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中
38+
* @param metadata
39+
* @param exception
40+
*/
41+
@Override
42+
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
43+
44+
}
45+
46+
47+
/**
48+
* 清理工作
49+
*/
50+
@Override
51+
public void close() {
52+
}
53+
54+
55+
/**
56+
* 初始化工作
57+
* @param configs
58+
*/
59+
@Override
60+
public void configure(Map<String, ?> configs) {
61+
62+
}
63+
}

springboot-mq-idempotent-consume/src/main/resources/application.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ spring:
3333
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
3434
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
3535
acks: 1
36+
# 生产者拦截器配置
37+
properties:
38+
interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor
3639
consumer:
3740
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
3841
auto-commit-interval: 1S

0 commit comments

Comments
 (0)