package rocketmq import ( "context" "errors" "time" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util" "git.shuncheng.lu/bigthing/gocommon/pkg/logger" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) type Consumer interface { // 如果需要定制化开发需要使用这个 GetRocketMqConsumer() rocketmq.PushConsumer // 订阅/消费 指定的topic Subscribe(topic string, handler func(ctx context.Context, message *primitive.MessageExt)) error // 订阅/消费,可以设置一个选择器 SubscribeWithMessageSelector(topic string, selector consumer.MessageSelector, handler func(ctx context.Context, message *primitive.MessageExt)) error // 订阅/消费,可以配置多个tag,表示订阅多个tag(注意如果为空,空抛出异常) SubscribeWithTagSelector(topic string, handler func(ctx context.Context, message *primitive.MessageExt), tag ...string) error } // Consumer-API type rocketMqConsumer struct { pushConsumer rocketmq.PushConsumer isStarted bool } func newConsumer(configs ...consumer.Option) (*rocketMqConsumer, error) { pushConsumer, err := consumer.NewPushConsumer(configs...) if err != nil { return nil, err } infoc(context.Background(), "[RocketMq-Consumer] init success, consumer running info: %#v", pushConsumer.GetConsumerRunningInfo()) return &rocketMqConsumer{ pushConsumer: pushConsumer, isStarted: false, }, nil } // 启动 func (c *rocketMqConsumer) start() error { if err := c.pushConsumer.Start(); err != nil { return err } c.isStarted = true return nil } // 获取consumer func (c *rocketMqConsumer) GetRocketMqConsumer() rocketmq.PushConsumer { return c.pushConsumer } // 订阅(如果启动后再去订阅则无法订阅,只允许启动前设置) func (c *rocketMqConsumer) Subscribe(topic string, handler func(ctx context.Context, message *primitive.MessageExt)) error { return c.SubscribeWithMessageSelector(topic, consumer.MessageSelector{}, handler) } // 订阅(如果启动后再去订阅则无法订阅,只允许启动前设置) func (c *rocketMqConsumer) SubscribeWithMessageSelector(topic string, selector consumer.MessageSelector, handler func(ctx context.Context, message *primitive.MessageExt)) error { if c.isStarted == true { return consumerStartedError } if topic == "" || handler == nil { return errors.New("rocket-mq consumer subscribe find topic is empty or handler is nil") } if err := c.pushConsumer.Subscribe(topic, selector, func(ctx context.Context, messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for index, _ := range messages { func(msg *primitive.MessageExt) { defer func() { if err := recover(); err != nil { errorc(ctx, "[RocketMq-Consumer] end find panic, err: %v", err) } }() start := time.Now() traceId := GetMessageTraceId(msg) if traceId == "" { traceId = logger.GenerateTraceId() } ctx := context.WithValue(ctx, loggerTraceIdKey, traceId) // message_id 为唯一的数据 infoc(ctx, "[RocketMq-Consumer] start, msg_id: %s, topic: %s, queue_id: %d, offset: %d, body: %s, tag: %s", msg.MsgId, msg.Topic, msg.Queue.QueueId, msg.QueueOffset, msg.Body, msg.GetTags()) handler(ctx, msg) infoc(ctx, "[RocketMq-Consumer] end success, spend: %dms", time.Now().Sub(start).Milliseconds()) }(messages[index]) } return consumer.ConsumeSuccess, nil }); err != nil { errorc(context.Background(), "[RocketMq-Consumer] subscribe topic err, topic: %s, err: %v", topic, err) return err } infoc(context.Background(), "[RocketMq-Consumer] subscribe topic success, topic: %s, selector: %#v", topic, selector) return nil } // 订阅(如果启动后再去订阅则无法订阅,只允许启动前设置) func (c *rocketMqConsumer) SubscribeWithTagSelector(topic string, handler func(ctx context.Context, message *primitive.MessageExt), tag ...string) error { if tag == nil || len(tag) == 0 { return errors.New("rocket mq can not select tag with nil tag, please check invoke SubscribeWithTagSelector func") } return c.SubscribeWithMessageSelector(topic, consumer.MessageSelector{ Type: consumer.TAG, Expression: util.JoinIgnoreSpace(tag, " || "), // eg: "TagA || TagC" 表示可以订阅 TagA或者TagC , "*"表示订阅全部 }, handler) }