1. 实现原理
通过topic区分不同的延迟时长,每个topic对于一个延迟,比如 topic100 仅存储延迟 100ms 的消息,topic1000 仅存储延迟 1s 的消息,依次类推。

生产消息时,消息需按延迟时长投递到对应的topic。消费消息时,检查消息的时间,如果未到达延迟时长,则sleep剩余的时长后再处理。这样就简单的实现了基于kafka的延迟队列。死信队列,可作为一种特殊的延迟队列,比如延迟 3600000ms 的处理。
2. 消费者实现
点击(此处)折叠或打开
- // delay_consumer.go
- package main
- import (
- "context"
- "time"
- "github.com/IBM/sarama"
- "github.com/sirupsen/logrus"
- )
- // 定义每个topic对应的延迟时间(ms)
- var topicDelayConfig = map[string]time.Duration{
- "delay-100ms": 100 * time.Millisecond,
- "delay-200ms": 200 * time.Millisecond,
- "delay-500ms": 500 * time.Millisecond,
- "delay-1000ms": 1000 * time.Millisecond,
- }
- type delayConsumerHandler struct {
- // 可以添加必要的依赖,如业务处理器等
- }
- func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {
- logrus.Info("延迟队列消费者初始化完成")
- return nil
- }
- func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
- logrus.Info("延迟队列消费者清理完成")
- return nil
- }
- // ConsumeClaim 处理分区消息,实现延迟逻辑
- func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
- topic := claim.Topic()
- delay, exists := topicDelayConfig[topic]
- if !exists {
- logrus.Errorf("topic %s 未配置延迟时间,跳过消费", topic)
- // 标记所有消息为已消费,避免重复处理
- for range claim.Messages() {
- sess.MarkMessage(msg, "")
- }
- return nil
- }
- // 按顺序处理消息(假设消息时间有序)
- for msg := range claim.Messages() {
- // 检查会话是否已关闭(如重平衡发生)
- select {
- case <-sess.Context().Done():
- logrus.Info("会话已关闭,停止消费")
- return nil
- default:
- }
- // 计算需要延迟的时间
- // 消息应该被处理的时间 = 消息产生时间 + 主题延迟时间
- produceTime := msg.Timestamp
- processTime := produceTime.Add(delay)
- now := time.Now()
- // 如果当前时间未到处理时间,计算需要休眠的时间
- if now.Before(processTime) {
- sleepDuration := processTime.Sub(now)
- logrus.Debugf(
- "消息需要延迟处理,topic=%s, offset=%d, 需等待 %v (产生时间: %v, 预计处理时间: %v)",
- topic, msg.Offset, sleepDuration, produceTime, processTime,
- )
- // 休眠期间监听会话关闭信号,避免阻塞重平衡
- select {
- case <-sess.Context().Done():
- logrus.Info("休眠期间会话关闭,停止消费")
- return nil
- case <-time.After(sleepDuration):
- // 休眠完成,继续处理
- }
- }
- // 延迟时间已到,处理消息
- h.processMessage(msg)
- // 标记消息为已消费
- sess.MarkMessage(msg, "")
- }
- return nil
- }
- // 实际业务处理逻辑
- func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {
- logrus.Infof(
- "处理延迟消息,topic=%s, partition=%d, offset=%d, key=%s, value=%s, 产生时间=%v",
- msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,
- )
- // 这里添加实际的业务处理代码
- }
- // 初始化消费者示例
- func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {
- config := sarama.NewConfig()
- config.Version = sarama.V2_8_1_0 // 指定Kafka版本
- config.Consumer.Return.Errors = true
- config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
- // 确保消息的Timestamp是创建时间(需要Kafka broker配置支持)
- config.Consumer.Fetch.Min = 1
- config.Consumer.Fetch.Default = 1024 * 1024
- return sarama.NewConsumerGroup(brokers, groupID, config)
- }
- func main() {
- brokers := []string{"localhost:9092"}
- groupID := "delay-queue-group"
- topics := []string{"delay-100ms", "delay-200ms", "delay-500ms", "delay-1000ms"}
- consumer, err := newDelayConsumer(brokers, groupID)
- if err != nil {
- logrus.Fatalf("创建消费者失败: %v", err)
- }
- defer consumer.Close()
- handler := &delayConsumerHandler{}
- ctx := context.Background()
- // 持续消费
- for {
- if err := consumer.Consume(ctx, topics, handler); err != nil {
- logrus.Errorf("消费出错: %v", err)
- // 简单重试逻辑
- time.Sleep(5 * time.Second)
- }
- }
- }
3. 生产者实现
点击(此处)折叠或打开
- // delay_producer.go
- package main
- import (
- "errors"
- "time"
- "github.com/IBM/sarama"
- "github.com/sirupsen/logrus"
- )
- // 定义允许的延迟时长(毫秒)及其对应的Topic
- var allowedDelays = map[time.Duration]string{
- 100 * time.Millisecond: "delay-100ms",
- 200 * time.Millisecond: "delay-200ms",
- 500 * time.Millisecond: "delay-500ms",
- 1000 * time.Millisecond: "delay-1000ms",
- // 可根据需要添加更多允许的延迟时长
- }
- // DelayProducer 延迟消息生产者
- type DelayProducer struct {
- producer sarama.SyncProducer
- }
- // NewDelayProducer 创建延迟消息生产者实例
- func NewDelayProducer(brokers []string) (*DelayProducer, error) {
- config := sarama.NewConfig()
- config.Version = sarama.V2_8_1_0 // 匹配Kafka版本
- config.Producer.RequiredAcks = sarama.WaitForAll
- config.Producer.Retry.Max = 3
- config.Producer.Return.Successes = true
- producer, err := sarama.NewSyncProducer(brokers, config)
- if err != nil {
- return nil, err
- }
- return &DelayProducer{
- producer: producer,
- }, nil
- }
- // SendDelayMessage 发送延迟消息
- // 参数:
- // - key: 消息键
- // - value: 消息内容
- // - delay: 延迟时长
- // 返回:
- // - 消息的分区和偏移量
- // - 错误信息(若延迟不合法或发送失败)
- func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {
- // 1. 校验延迟时长是否合法
- topic, ok := allowedDelays[delay]
- if !ok {
- return 0, 0, errors.New("invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms")
- }
- // 2. 创建消息,设置当前时间为消息时间戳(供消费者计算延迟)
- msg := &sarama.ProducerMessage{
- Topic: topic,
- Key: sarama.ByteEncoder(key),
- Value: sarama.ByteEncoder(value),
- Timestamp: time.Now(), // 记录消息发送时间,用于消费者计算处理时间
- }
- // 3. 发送消息
- partition, offset, err = p.producer.SendMessage(msg)
- if err != nil {
- logrus.Errorf("发送延迟消息失败: %v, 延迟时长: %v", err, delay)
- return 0, 0, err
- }
- logrus.Infof("发送延迟消息成功, topic: %s, 分区: %d, 偏移量: %d, 延迟时长: %v",
- topic, partition, offset, delay)
- return partition, offset, nil
- }
- // Close 关闭生产者
- func (p *DelayProducer) Close() error {
- return p.producer.Close()
- }
- // 使用示例
- func main() {
- // 初始化生产者
- producer, err := NewDelayProducer([]string{"localhost:9092"})
- if err != nil {
- logrus.Fatalf("初始化生产者失败: %v", err)
- }
- defer producer.Close()
- // 发送合法延迟消息
- _, _, err = producer.SendDelayMessage(
- []byte("test-key"),
- []byte("这是一条延迟消息"),
- 100*time.Millisecond, // 合法延迟
- )
- if err != nil {
- logrus.Error("发送消息失败:", err)
- }
- // 尝试发送非法延迟消息(会被拒绝)
- _, _, err = producer.SendDelayMessage(
- []byte("test-key"),
- []byte("这是一条非法延迟消息"),
- 300*time.Millisecond, // 不允许的延迟
- )
- if err != nil {
- logrus.Error("发送消息失败:", err) // 会输出非法延迟的错误
- }
- }