Skip to content

studeyang/easykafka

Repository files navigation

Deutsch | English | Español | français | 日本語 | 한국어 | Português | Русский | 中文

EasyKafka

基于 Spring Kafka 的轻量级增强框架,让消息的发送与接收更简单。

目录


一、简介

EasyKafka 是基于 Spring Kafka 的增强框架,原 Spring Kafka 所有功能完全适配。

核心目标:使消息的发送和接收更简单。

主要解决以下问题:

  • 简化发送流程:一行代码发送消息,无需手动构建 ProducerRecord
  • 简化消费接入:通过 @EventHandler + @KafkaHandler 注解即可完成消费者注册
  • 统一消息重试:内置 3 次重试,指数退避策略(100ms → 5000ms),无需手动处理
  • 多集群支持:在同一应用中同时接入多个 Kafka 集群
  • 灰度路由:自动区分基线/灰度环境,消息精准路由到对应集群

二、开发环境

依赖 版本
Java 1.8+
Maven 3.6.3+
Spring Boot 2.4.3+

三、快速开始

3.1 发送消息

完整示例:EventPublisherTest

1. 引入依赖

<dependency>
    <groupId>io.github.studeyang</groupId>
    <artifactId>easykafka-spring-boot-starter</artifactId>
    <version>${easykafka.version}</version>
</dependency>

2. 配置 Kafka 集群

easykafka:
  init:
    kafkaCluster:
      - cluster: send
        brokers: kafka.example.com:9092

3. 定义消息

import io.github.open.easykafka.client.annotation.Topic;
import io.github.open.easykafka.client.message.Event;

@Topic(cluster = "send", name = "order-topic")
public class OrderCreatedEvent extends Event {
    private String orderId;
    private String userId;
    // getter/setter ...
}

4. 发送消息

import io.github.open.easykafka.client.EventPublisher;

OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId("ORD-123");
event.setUserId("USER-456");

EventPublisher.publish(event);

3.2 消费消息

完整示例:MultiMethodEventHandler

1. 引入依赖

<!-- easykafka -->
<dependency>
    <groupId>io.github.studeyang</groupId>
    <artifactId>easykafka-spring-boot-starter</artifactId>
    <version>${easykafka.version}</version>
</dependency>

<!-- 消息定义包(与生产者共享) -->
<dependency>
    <groupId>io.github.studeyang</groupId>
    <artifactId>example-sdk</artifactId>
    <version>${sdk.version}</version>
</dependency>

2. 实现消费逻辑

import io.github.open.easykafka.client.annotation.EventHandler;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.stereotype.Service;

@Service
@EventHandler(cluster = "send", topics = "order-topic")
public class OrderEventHandler {

    @KafkaHandler
    public void handle(OrderCreatedEvent event) {
        System.out.println("收到订单创建事件: " + event.getOrderId());
    }
}

四、核心功能

4.1 消息类型

EasyKafka 提供三种基础消息类型,所有消息类型均继承自 AbstractMessage

类型 说明 使用场景
Event 标准领域事件 业务事件通知(订单创建、支付完成等)
Broadcast 广播消息 一对多通知(配置变更、缓存刷新等)
DataSync 数据同步消息 跨服务数据同步

框架在发送前自动填充以下字段:

字段 说明
messageId 唯一消息 ID(UUID)
messageKey Kafka Record Key(来自 @MessageKey 注解字段)
messageHeader Kafka Headers(来自 @MessageHeader 注解字段)
messageTag 路由标签(BASE / GRAY)
messageService 来源服务名
messageTopic Topic 元数据(集群名 + Topic 名)
messageCreateTime 消息创建时间

4.2 消息注解

@Topic

标注在消息类上,声明消息所属的 Kafka 集群和 Topic 名称。

@Topic(cluster = "send", name = "order-topic")
public class OrderCreatedEvent extends Event {
    // ...
}
属性 类型 必填 说明
cluster String 逻辑集群名称,需与配置中的 cluster 对应
name String Kafka Topic 名称

@MessageKey

标注在消息类的字段上,将该字段的值作为 Kafka Record Key,用于消息分区路由。

@Topic(cluster = "send", name = "order-topic")
public class OrderCreatedEvent extends Event {
    @MessageKey
    private String orderId;  // 同一 orderId 的消息路由到同一分区
}

@MessageHeader

标注在消息类的字段上,将该字段的值写入 Kafka Record Header,消费端可通过 @Header 获取。

@Topic(cluster = "send", name = "order-topic")
public class OrderCreatedEvent extends Event {
    @MessageHeader
    private String requestId;

    @MessageHeader
    private String traceId;
}

消费端读取 Header:

@KafkaHandler
public void handle(OrderCreatedEvent event,
                   @Header(value = "requestId", required = false) String requestId,
                   @Header(value = "traceId", required = false) String traceId) {
    log.info("requestId={}, traceId={}", requestId, traceId);
}

@EventHandler

标注在消费者类上,声明该类为 Kafka 消息处理器。底层是对 @KafkaListener 的封装。

@Service
@EventHandler(
    cluster = "send",
    topics = "order-topic",
    concurrency = "3"
)
public class OrderEventHandler {

    @KafkaHandler
    public void handleCreated(OrderCreatedEvent event) { ... }

    @KafkaHandler
    public void handleCancelled(OrderCancelledEvent event) { ... }
}
属性 类型 必填 默认值 说明
cluster String - 消费的 Kafka 集群名称
topics String - 订阅的 Topic 名称
groupId String 自动生成 消费者组 ID,默认为 {groupIdPrefix}{topics}
concurrency String "1" 并发消费线程数
id String 自动生成 Listener 容器 ID

4.3 发送回调

EventPublisher.publish 支持传入回调,在发送成功/失败后异步执行。

完整回调(MessageCallback)

EventPublisher.publish(event, new MessageCallback() {
    @Override
    public void onSuccess() {
        log.info("消息发送成功");
    }

    @Override
    public void onFail(Exception e) {
        log.error("消息发送失败", e);
        // 可在此处做补偿操作
    }
});

仅关注成功(SuccessCallback)

EventPublisher.publish(event, new SuccessCallback() {
    @Override
    public void onSuccess() {
        log.info("消息发送成功");
    }
});

仅关注失败(FailCallback)

EventPublisher.publish(event, new FailCallback() {
    @Override
    public void onFail(Exception e) {
        log.error("消息发送失败,进行告警", e);
    }
});

批量发送

List<OrderCreatedEvent> events = buildEvents();
EventPublisher.publish(events);

4.4 泛型发送(ObjectPublisher)

当消息对象没有继承 Event,或需要动态指定 Topic 时,使用 ObjectPublisher

指定 Topic 发送

ObjectPublisher.publish(myObject, new TopicMetadata("send", "custom-topic"));

使用完整元数据发送

MessageMetadata metadata = new MessageMetadataBuilder()
    .topicMetadata("send", "custom-topic")
    .messageKey("key-123")
    .messageHeader("requestId", "req-456")
    .messageTag(Tag.GRAY)
    .build();

ObjectPublisher.publish(myObject, metadata);

带回调发送

ObjectPublisher.publish(myObject, metadata, new MessageCallback() {
    public void onSuccess() { ... }
    public void onFail(Exception e) { ... }
});

4.5 多集群与灰度路由

EasyKafka 支持在同一应用中同时接入多个 Kafka 集群,并内置灰度路由能力。

多集群配置

easykafka:
  init:
    kafkaCluster:
      - cluster: order     # 订单集群
        brokers: order-kafka.example.com:9092
      - cluster: payment   # 支付集群
        brokers: pay-kafka.example.com:9092

灰度路由配置

为集群配置 tag: gray 的灰度节点,框架会根据当前环境自动路由:

easykafka:
  init:
    kafkaCluster:
      - cluster: send
        brokers: kafka-prod.example.com:9092       # 基线环境
      - cluster: send
        brokers: kafka-gray.example.com:9092
        tag: gray                                   # 灰度环境

提示:如果未配置灰度节点,框架会自动将基线配置复制一份作为灰度配置,不会影响正常启动。

手动指定路由标签

// 强制路由到灰度集群
MessageMetadata metadata = new MessageMetadataBuilder()
    .topicMetadata("send", "order-topic")
    .messageTag(Tag.GRAY)
    .build();

ObjectPublisher.publish(event, metadata);

4.6 自动重试

发送失败时框架自动重试,无需业务代码干预:

参数
最大重试次数 3 次
初始等待时间 100ms
退避倍数 2x
最大等待时间 5000ms
重试策略 指数退避

五、配置参考

5.1 集群配置

easykafka:
  init:
    kafkaCluster:
      - cluster: send              # 逻辑集群名称(与 @Topic/@EventHandler 对应)
        brokers: 127.0.0.1:9092    # Broker 地址,多个用逗号分隔
        tag: BASE                  # 标签:BASE(默认)或 GRAY
属性 类型 必填 默认值 说明
cluster String - 逻辑集群标识符
brokers String - Broker 地址列表,格式 host:port
tag Tag BASE 环境标签(BASE / GRAY)

5.2 生产者运行时配置

easykafka:
  runtime:
    producer:
      partitionSize: 500           # 批量发送时的分区大小
      async:
        corePoolSize: 3            # 核心线程数
        maxPoolSize: 5             # 最大线程数
        keepAliveSeconds: 60       # 空闲线程存活时间(秒)
        queueCapacity: 100         # 队列容量
        rejectedHandler: java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
        threadNamePrefix: kafka-async-producer-
属性 默认值 说明
partitionSize - 批量发送时的分区大小
async.corePoolSize 3 异步线程池核心线程数
async.maxPoolSize 5 异步线程池最大线程数
async.keepAliveSeconds 60 线程空闲存活时间
async.queueCapacity 100 任务队列容量
async.rejectedHandler CallerRunsPolicy 拒绝策略(全限定类名)
async.threadNamePrefix kafka-async-producer- 线程名前缀

5.3 消费者运行时配置

easykafka:
  runtime:
    consumer:
      groupIdPrefix: "my-service-" # 消费者组 ID 前缀
属性 默认值 说明
groupIdPrefix - 自动生成 Group ID 的前缀,最终 Group ID = {groupIdPrefix}{topicName}

5.4 完整配置示例

spring:
  application:
    name: order-service

easykafka:
  init:
    kafkaCluster:
      - cluster: order
        brokers: order-kafka.example.com:9092
        tag: BASE
      - cluster: order
        brokers: order-kafka-gray.example.com:9092
        tag: GRAY
      - cluster: payment
        brokers: pay-kafka.example.com:9092
    producer:
      - beanName: orderProducer
        config:
          retries: 3
          batch.size: 32768
          linger.ms: 5
          acks: all
    consumer:
      - beanName: paymentConsumer
        cluster: payment
        config:
          session.timeout.ms: 30000

  runtime:
    producer:
      partitionSize: 1000
      async:
        corePoolSize: 5
        maxPoolSize: 10
        keepAliveSeconds: 120
        queueCapacity: 500
    consumer:
      groupIdPrefix: "order-service-"

六、架构说明

发送流程

EventPublisher.publish(event)
    │
    ▼
DefaultMessagePublisher(异步线程池)
    │
    ▼
SendMessageConverter(Event → SendMessage)
    │
    ▼
Sender 责任链(装饰器模式)
    ├── ReportableSender(记录发送结果日志)
    ├── RetryableSender(失败重试,最多 3 次,指数退避)
    └── ProducerSender(构建 ProducerRecord,调用 Kafka)
    │
    ▼
ProducerContainer.getProducer(cluster, tag)
    │
    ▼
StringKafkaProducer → Kafka Broker
    │
    ▼
MessageCallback.onSuccess() / onFail()

消费流程

Kafka Broker
    │
    ▼
@EventHandler(标注在消费者类上)
    │
    ▼
EventHandlerAnnotationBeanPostProcessor
(发现 Handler,注册元数据到 ListenerContainer)
    │
    ▼
KafkaListenerContainerFactoryRegistrar
(为每个集群创建 ConcurrentKafkaListenerContainerFactory
  命名规则:{cluster}KafkaListenerContainerFactory
  AckMode:RECORD,自定义反序列化器)
    │
    ▼
@KafkaHandler 方法(根据消息类型自动路由到对应方法)

模块结构

easykafka/
├── easykafka-client/             # 核心模块:生产者/消费者逻辑与注解
├── easykafka-spring-boot-starter/ # Spring Boot 自动配置入口
├── example-sdk/                  # 消息定义包(模拟真实 SDK jar)
├── example-producer/             # 生产者示例应用
└── example-consumer/             # 消费者示例应用

七、构建与测试

# 构建所有模块
mvn clean install

# 运行所有测试
mvn clean test

# 运行指定模块的测试
mvn clean test -f easykafka-client/pom.xml

# 运行指定测试类
mvn test -Dtest=JsonUtilsTest

# 运行指定测试方法
mvn test -Dtest=JsonUtilsTest#someMethod

# 打包(CI 环境使用)
mvn -B package --file pom.xml

About

Make kafka easier to use.(基于 Spring Kafka 增强,让消息收发更简单)

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages