基于Kafka的延迟队列

10阅读 0评论2025-08-27 aquester
分类:架构设计与优化

1.  实现原理

通过topic区分不同的延迟时长,每个topic对于一个延迟,比如 topic100 仅存储延迟 100ms 的消息,topic1000 仅存储延迟 1s 的消息,依次类推。




生产消息时,消息需按延迟时长投递到对应的topic。消费消息时,检查消息的时间,如果未到达延迟时长,则sleep剩余的时长后再处理。这样就简单的实现了基于kafka的延迟队列。死信队列,可作为一种特殊的延迟队列,比如延迟 3600000ms 的处理。

2.  消费者实现


点击(此处)折叠或打开

  1. // delay_consumer.go
  2. package main
  3. import (
  4. "context"
  5. "time"
  6. "github.com/IBM/sarama"
  7. "github.com/sirupsen/logrus"
  8. )
  9. // 定义每个topic对应的延迟时间(ms)
  10. var topicDelayConfig = map[string]time.Duration{
  11. "delay-100ms": 100 * time.Millisecond,
  12. "delay-200ms": 200 * time.Millisecond,
  13. "delay-500ms": 500 * time.Millisecond,
  14. "delay-1000ms": 1000 * time.Millisecond,
  15. }
  16. type delayConsumerHandler struct {
  17. // 可以添加必要的依赖,如业务处理器等
  18. }
  19. func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {
  20. logrus.Info("延迟队列消费者初始化完成")
  21. return nil
  22. }
  23. func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
  24. logrus.Info("延迟队列消费者清理完成")
  25. return nil
  26. }
  27. // ConsumeClaim 处理分区消息,实现延迟逻辑
  28. func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  29. topic := claim.Topic()
  30. delay, exists := topicDelayConfig[topic]
  31. if !exists {
  32. logrus.Errorf("topic %s 未配置延迟时间,跳过消费", topic)
  33. // 标记所有消息为已消费,避免重复处理
  34. for range claim.Messages() {
  35. sess.MarkMessage(msg, "")
  36. }
  37. return nil
  38. }
  39. // 按顺序处理消息(假设消息时间有序)
  40. for msg := range claim.Messages() {
  41. // 检查会话是否已关闭(如重平衡发生)
  42. select {
  43. case <-sess.Context().Done():
  44. logrus.Info("会话已关闭,停止消费")
  45. return nil
  46. default:
  47. }
  48. // 计算需要延迟的时间
  49. // 消息应该被处理的时间 = 消息产生时间 + 主题延迟时间
  50. produceTime := msg.Timestamp
  51. processTime := produceTime.Add(delay)
  52. now := time.Now()
  53. // 如果当前时间未到处理时间,计算需要休眠的时间
  54. if now.Before(processTime) {
  55. sleepDuration := processTime.Sub(now)
  56. logrus.Debugf(
  57. "消息需要延迟处理,topic=%s, offset=%d, 需等待 %v (产生时间: %v, 预计处理时间: %v)",
  58. topic, msg.Offset, sleepDuration, produceTime, processTime,
  59. )
  60. // 休眠期间监听会话关闭信号,避免阻塞重平衡
  61. select {
  62. case <-sess.Context().Done():
  63. logrus.Info("休眠期间会话关闭,停止消费")
  64. return nil
  65. case <-time.After(sleepDuration):
  66. // 休眠完成,继续处理
  67. }
  68. }
  69. // 延迟时间已到,处理消息
  70. h.processMessage(msg)
  71. // 标记消息为已消费
  72. sess.MarkMessage(msg, "")
  73. }
  74. return nil
  75. }
  76. // 实际业务处理逻辑
  77. func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {
  78. logrus.Infof(
  79. "处理延迟消息,topic=%s, partition=%d, offset=%d, key=%s, value=%s, 产生时间=%v",
  80. msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,
  81. )
  82. // 这里添加实际的业务处理代码
  83. }
  84. // 初始化消费者示例
  85. func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {
  86. config := sarama.NewConfig()
  87. config.Version = sarama.V2_8_1_0 // 指定Kafka版本
  88. config.Consumer.Return.Errors = true
  89. config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
  90. // 确保消息的Timestamp是创建时间(需要Kafka broker配置支持)
  91. config.Consumer.Fetch.Min = 1
  92. config.Consumer.Fetch.Default = 1024 * 1024
  93. return sarama.NewConsumerGroup(brokers, groupID, config)
  94. }
  95. func main() {
  96. brokers := []string{"localhost:9092"}
  97. groupID := "delay-queue-group"
  98. topics := []string{"delay-100ms", "delay-200ms", "delay-500ms", "delay-1000ms"}
  99. consumer, err := newDelayConsumer(brokers, groupID)
  100. if err != nil {
  101. logrus.Fatalf("创建消费者失败: %v", err)
  102. }
  103. defer consumer.Close()
  104. handler := &delayConsumerHandler{}
  105. ctx := context.Background()
  106. // 持续消费
  107. for {
  108. if err := consumer.Consume(ctx, topics, handler); err != nil {
  109. logrus.Errorf("消费出错: %v", err)
  110. // 简单重试逻辑
  111. time.Sleep(5 * time.Second)
  112. }
  113. }
  114. }

3.  生产者实现


点击(此处)折叠或打开

  1. // delay_producer.go
  2. package main
  3. import (
  4. "errors"
  5. "time"
  6. "github.com/IBM/sarama"
  7. "github.com/sirupsen/logrus"
  8. )
  9. // 定义允许的延迟时长(毫秒)及其对应的Topic
  10. var allowedDelays = map[time.Duration]string{
  11. 100 * time.Millisecond: "delay-100ms",
  12. 200 * time.Millisecond: "delay-200ms",
  13. 500 * time.Millisecond: "delay-500ms",
  14. 1000 * time.Millisecond: "delay-1000ms",
  15. // 可根据需要添加更多允许的延迟时长
  16. }
  17. // DelayProducer 延迟消息生产者
  18. type DelayProducer struct {
  19. producer sarama.SyncProducer
  20. }
  21. // NewDelayProducer 创建延迟消息生产者实例
  22. func NewDelayProducer(brokers []string) (*DelayProducer, error) {
  23. config := sarama.NewConfig()
  24. config.Version = sarama.V2_8_1_0 // 匹配Kafka版本
  25. config.Producer.RequiredAcks = sarama.WaitForAll
  26. config.Producer.Retry.Max = 3
  27. config.Producer.Return.Successes = true
  28. producer, err := sarama.NewSyncProducer(brokers, config)
  29. if err != nil {
  30. return nil, err
  31. }
  32. return &DelayProducer{
  33. producer: producer,
  34. }, nil
  35. }
  36. // SendDelayMessage 发送延迟消息
  37. // 参数:
  38. // - key: 消息键
  39. // - value: 消息内容
  40. // - delay: 延迟时长
  41. // 返回:
  42. // - 消息的分区和偏移量
  43. // - 错误信息(若延迟不合法或发送失败)
  44. func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {
  45. // 1. 校验延迟时长是否合法
  46. topic, ok := allowedDelays[delay]
  47. if !ok {
  48. return 0, 0, errors.New("invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms")
  49. }
  50. // 2. 创建消息,设置当前时间为消息时间戳(供消费者计算延迟)
  51. msg := &sarama.ProducerMessage{
  52. Topic: topic,
  53. Key: sarama.ByteEncoder(key),
  54. Value: sarama.ByteEncoder(value),
  55. Timestamp: time.Now(), // 记录消息发送时间,用于消费者计算处理时间
  56. }
  57. // 3. 发送消息
  58. partition, offset, err = p.producer.SendMessage(msg)
  59. if err != nil {
  60. logrus.Errorf("发送延迟消息失败: %v, 延迟时长: %v", err, delay)
  61. return 0, 0, err
  62. }
  63. logrus.Infof("发送延迟消息成功, topic: %s, 分区: %d, 偏移量: %d, 延迟时长: %v",
  64. topic, partition, offset, delay)
  65. return partition, offset, nil
  66. }
  67. // Close 关闭生产者
  68. func (p *DelayProducer) Close() error {
  69. return p.producer.Close()
  70. }
  71. // 使用示例
  72. func main() {
  73. // 初始化生产者
  74. producer, err := NewDelayProducer([]string{"localhost:9092"})
  75. if err != nil {
  76. logrus.Fatalf("初始化生产者失败: %v", err)
  77. }
  78. defer producer.Close()
  79. // 发送合法延迟消息
  80. _, _, err = producer.SendDelayMessage(
  81. []byte("test-key"),
  82. []byte("这是一条延迟消息"),
  83. 100*time.Millisecond, // 合法延迟
  84. )
  85. if err != nil {
  86. logrus.Error("发送消息失败:", err)
  87. }
  88. // 尝试发送非法延迟消息(会被拒绝)
  89. _, _, err = producer.SendDelayMessage(
  90. []byte("test-key"),
  91. []byte("这是一条非法延迟消息"),
  92. 300*time.Millisecond, // 不允许的延迟
  93. )
  94. if err != nil {
  95. logrus.Error("发送消息失败:", err) // 会输出非法延迟的错误
  96. }
  97. }



上一篇:执行“go mod tidy”遇到“misbehavior”错误
下一篇:没有了