Skip to content

jinpengqiong/message-queue-learning

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

🎓 消息队列学习项目 - 从零实现

这是一个全面的消息队列学习项目,通过使用Go语言从零开始实现一个消息队列系统,帮助你深入理解消息队列的原理、设计和应用场景。

📚 项目目标

通过完成这个项目,你将掌握:

  • ✅ 消息队列的核心原理和设计模式
  • 并发编程的最佳实践(sync.Mutex、sync.Cond、goroutine)
  • ✅ 消息队列的典型使用场景和实现细节
  • ✅ 从简单到复杂的逐步学习路径
  • ✅ 与Kafka、RabbitMQ等生产级MQ的设计对比

🏗️ 项目架构

项目分为5个递进式阶段,每个阶段都是前一个阶段的升级:

message-queue-learning/
├── phase1-basic/           # ✅ 第一阶段:基础单机队列(已完成)
│   ├── queue.go            # 核心队列实现
│   ├── example_basic.go    # 7个使用示例
│   ├── main_test.go        # 14个单元测试 + 基准测试
│   └── main.go             # 阶段主程序
│
├── phase2-persistence/     # ⏳ 第二阶段:消息持久化(开发中)
├── phase3-reliability/     # ⏳ 第三阶段:可靠性保证(开发中)
├── phase4-distributed/     # ⏳ 第四阶段:分布式支持(开发中)
├── phase5-advanced/        # ⏳ 第五阶段:高级功能(开发中)
│
├── main.go                 # 项目入口
├── go.mod                  # Go模块定义
└── README.md               # 本文件

🚀 快速开始

环境要求

  • Go 1.21 或更高版本
  • macOS / Linux / Windows

安装

cd /Users/peng.jin/Desktop/message-queue-learning

运行示例

运行第一阶段所有示例

go run main.go -phase=1

运行第一阶段的单个示例

go run main.go -phase=1 -example=1    # 基础生产者-消费者
go run main.go -phase=1 -example=2    # 多生产者多消费者
go run main.go -phase=1 -example=3    # 非阻塞消费
go run main.go -phase=1 -example=4    # 队列监控
go run main.go -phase=1 -example=5    # 优雅关闭
go run main.go -phase=1 -example=6    # 错误处理
go run main.go -phase=1 -example=7    # 突发流量处理

运行测试

运行所有单元测试

go test ./phase1-basic -v

运行性能基准测试

go test ./phase1-basic -bench=. -benchmem

运行特定测试

go test ./phase1-basic -run TestQueueBasicOperations -v
go test ./phase1-basic -run BenchmarkQueueProduce -bench=.

📖 第一阶段详解:基础单机队列

阶段目标

理解消息队列的最核心概念:

  1. FIFO 数据结构 - 先进先出的消息存储
  2. 生产者-消费者模式 - 异步解耦的核心思想
  3. 并发安全 - 使用互斥锁和条件变量
  4. 阻塞vs非阻塞消费 - 两种消费模式的权衡
  5. 优雅关闭 - 完整的生命周期管理

核心API

// 创建队列
queue := phase1.NewQueue("order-queue")

// 发送消息(阻塞安全)
msgID, err := queue.Produce("订单001")

// 阻塞消费(等待消息)
msg, ok := queue.Consume()      // ok=false 表示队列已关闭
if ok {
    fmt.Println(msg.Body)
}

// 非阻塞消费(立即返回)
msg, ok := queue.TryConsume()   // ok=false 表示队列为空

// 查看队列头部(不移除)
msg := queue.Peek()

// 获取队列大小
size := queue.Size()

// 获取统计信息
stats := queue.GetStats()

// 优雅关闭
queue.Close()

示例讲解

示例1:基础生产者-消费者

场景:订单处理系统

  • 生产者:持续生成订单
  • 消费者:处理订单
  • 学习点:基本的并发操作

示例2:多生产者多消费者

场景:日志收集系统

  • 3个生产者:3个应用产生日志
  • 2个消费者:2个日志处理器
  • 学习点:并发安全性和负载分配

示例3:非阻塞消费

场景:轮询任务处理

  • 消费者不想阻塞等待
  • 定期检查是否有新消息
  • 学习点:TryConsume() 的用法

示例4:队列监控

场景:实时监控队列状态

  • 同时产生和消费消息
  • 实时显示队列深度
  • 学习点:队列统计和状态获取

示例5:优雅关闭

场景:应用正常退出

  • 停止接收新消息
  • 等待所有消息处理完成
  • 安全退出
  • 学习点:生命周期管理

示例6:错误处理

场景:处理异常情况

  • 向关闭的队列发送消息
  • 重复关闭
  • 学习点:错误处理策略

示例7:突发流量处理

场景:流量削峰

  • 短时间内发送大量请求
  • 缓慢稳定处理
  • 队列缓冲流量
  • 学习点:消息队列的核心优势

关键代码分析

并发同步原语

type Queue struct {
    messages []*Message       // 消息切片
    mu       sync.Mutex       // 保护 messages
    cond     *sync.Cond       // 条件变量
    closed   bool             // 关闭标志
}

// 初始化时绑定互斥锁
q.cond = sync.NewCond(&q.mu)

为什么这样设计?

  • sync.Mutex 保护共享数据 messages
  • sync.Cond 实现消费者等待信号
  • 当生产者添加消息时,通过 Signal() 唤醒消费者
  • 当队列关闭时,通过 Broadcast() 唤醒所有等待者

阻塞消费的实现

func (q *Queue) Consume() (*Message, bool) {
    q.mu.Lock()
    defer q.mu.Unlock()

    // 使用 for 循环而不是 if(防止虚假唤醒)
    for len(q.messages) == 0 && !q.closed {
        q.cond.Wait()  // 释放锁并等待信号
    }

    if len(q.messages) == 0 && q.closed {
        return nil, false
    }

    msg := q.messages[0]
    q.messages = q.messages[1:]
    return msg, true
}

关键点

  • 使用 for 循环检查条件(while循环语义)
  • Wait() 自动释放锁然后等待
  • 被唤醒后自动重新获取锁

优雅关闭

func (q *Queue) Close() error {
    q.mu.Lock()
    defer q.mu.Unlock()

    if q.closed {
        return fmt.Errorf("队列已经关闭")
    }

    q.closed = true
    q.cond.Broadcast()  // 唤醒所有等待的消费者
    return nil
}

为什么使用 Broadcast()

  • 可能有多个消费者在等待
  • 需要全部唤醒,让它们检查关闭状态

性能基准

运行基准测试查看性能指标:

go test ./phase1-basic -bench=BenchmarkQueueProduce -benchmem
go test ./phase1-basic -bench=BenchmarkQueueConsume -benchmem

典型结果(取决于系统配置):

  • 生产: 每秒 ~1,000,000 操作
  • 消费: 每秒 ~500,000 操作(包括切片操作)

常见问题

Q1: 为什么使用 sync.Cond 而不是 channel?

  • Channel 是 Go 推荐的,但这个项目是从零学习,用 Cond 便于理解底层机制
  • Channel 底层也是基于 Cond 实现

Q2: 消息在哪里存储?

  • 在内存中(使用 Go 切片)
  • 重新分配时会产生GC压力
  • 第二阶段会添加磁盘持久化

Q3: 队列容量有限制吗?

  • 当前无限制(仅受系统内存限制)
  • 实际系统会设置最大队列深度

Q4: 如何处理消息丢失?

  • 当前阶段不能,消息只在内存中
  • 第二阶段会实现持久化
  • 第三阶段会实现确认机制

🎯 学习路线图

第一阶段:基础单机队列 ✅

学习内容

  • 消息队列基本概念
  • FIFO 队列实现
  • 并发安全的实现方式
  • 生产者-消费者模式

预计时间:1-2天

完成标志

  • 理解所有7个示例
  • 通过所有14个单元测试
  • 能够独立实现一个简单的队列

第二阶段:消息持久化 ⏳

学习内容

  • WAL (Write Ahead Log) 写入模式
  • 文件存储和索引
  • 崩溃恢复机制
  • 序列化和反序列化

核心功能

// 持久化设置
queue := phase2.NewPersistentQueue("order-queue", "./data")

// 消息会自动持久化到磁盘
msgID, _ := queue.Produce("订单001")

// 重启后自动恢复

预计时间:2-3天


第三阶段:可靠性保证 ⏳

学习内容

  • 消息确认机制(ACK)
  • 消息重试策略
  • 死信队列
  • 消费者偏移量管理
  • 顺序性保证

核心功能

// 消息确认
msg, _ := queue.Consume()
// ... 处理消息 ...
queue.Acknowledge(msg.ID)  // 确认处理

// 重试机制
queue.SetMaxRetries(3)
queue.SetRetryDelay(5 * time.Second)

// 死信队列
dlq := queue.GetDeadLetterQueue()
dlq.GetMessages()

预计时间:2-3天


第四阶段:分布式支持 ⏳

学习内容

  • 消费者分组(Consumer Group)
  • 负载均衡和分区
  • Rebalance 机制
  • 简单的集群协调

核心功能

// 消费者分组
group := phase4.NewConsumerGroup("payment-group")
queue.Subscribe(group)

// 自动负载均衡
group.AddConsumer("consumer-1")
group.AddConsumer("consumer-2")

// 消费者在消费分区消息
msg := consumer.Consume()

预计时间:3-4天


第五阶段:高级功能 ⏳

学习内容

  • 优先级队列
  • 延迟消息
  • 事务支持(最终一致性)
  • 消息过滤和路由
  • 监控和指标收集

核心功能

// 优先级消息
queue.ProduceWithPriority("VIP订单", 10)
queue.ProduceWithPriority("普通订单", 1)

// 延迟消息
queue.ProduceDelayed("超时取消", 30*time.Minute)

// 事务支持
tx := queue.BeginTransaction()
tx.Produce("消息1")
tx.Produce("消息2")
tx.Commit()  // 全部成功或全部失败

预计时间:2-3天


📚 与生产级MQ的对比

Kafka 设计对标

功能 第一阶段 第二阶段 第三阶段 第四阶段 第五阶段 Kafka
基本队列
消息持久化
可靠性保证
分布式
优先级/延迟

RabbitMQ 设计对标

功能 第一阶段 第二阶段 第三阶段 第四阶段 第五阶段 RabbitMQ
队列
消息持久化
消息确认
死信队列
路由和过滤

🧪 测试覆盖

第一阶段的测试覆盖:

  • ✅ 基本操作测试(Produce/Consume)
  • ✅ 并发安全测试
  • ✅ 空队列处理
  • ✅ 队列关闭测试
  • ✅ 错误处理测试
  • ✅ 性能基准测试
  • ✅ 消息属性验证

运行所有测试:

go test ./phase1-basic -v -cover

📖 学习建议

初学者学习路径

  1. 第1天

    • 阅读本README中的"第一阶段详解"
    • 运行示例1和示例2,理解基本概念
    • 阅读 queue.go 中的核心实现
  2. 第2天

    • 运行所有7个示例
    • 运行单元测试
    • 尝试修改代码,加深理解
  3. 第3天

    • 尝试独立实现一个简单的队列
    • 对比你的实现和项目代码的差异
    • 思考改进方案

深度学习路径

  1. 并发编程

    • 深入理解 sync.Mutexsync.Cond
    • 研究虚假唤醒问题
    • 性能优化
  2. 数据结构

    • 研究不同的队列实现(环形缓冲、链表等)
    • 性能对比
    • 内存管理
  3. 分布式系统

    • 研究一致性问题
    • 故障转移机制
    • 性能优化

🔧 开发指南

代码风格

项目使用以下代码风格:

  • 遵循 Go 官方 Effective Go
  • 使用 gofmt 进行格式化
  • 详细的中文注释解释关键概念

文件结构

每个阶段都遵循相同的结构:

  • queue.go - 核心实现(必须)
  • example_*.go - 使用示例(必须)
  • *_test.go - 单元测试(必须)
  • main.go - 阶段入口(可选)
  • README.md - 阶段文档(推荐)

📝 许可证

MIT License

🤝 反馈

如有问题或建议,欢迎提出!


Happy Learning! 🚀

这个项目旨在帮助你从零开始理解消息队列,通过逐步的实现和学习,最终掌握复杂的分布式系统概念。

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages