| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- package rocketmq
- import (
- "context"
- "errors"
- "time"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
- "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
- "github.com/apache/rocketmq-client-go/v2"
- "github.com/apache/rocketmq-client-go/v2/consumer"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- )
- type Consumer interface {
- // 如果需要定制化开发需要使用这个
- GetRocketMqConsumer() rocketmq.PushConsumer
- // 订阅/消费 指定的topic
- Subscribe(topic string, handler func(ctx context.Context, message *primitive.MessageExt)) error
- // 订阅/消费,可以设置一个选择器
- SubscribeWithMessageSelector(topic string, selector consumer.MessageSelector, handler func(ctx context.Context, message *primitive.MessageExt)) error
- // 订阅/消费,可以配置多个tag,表示订阅多个tag(注意如果为空,空抛出异常)
- SubscribeWithTagSelector(topic string, handler func(ctx context.Context, message *primitive.MessageExt), tag ...string) error
- }
- // Consumer-API
- type rocketMqConsumer struct {
- pushConsumer rocketmq.PushConsumer
- isStarted bool
- }
- func newConsumer(configs ...consumer.Option) (*rocketMqConsumer, error) {
- pushConsumer, err := consumer.NewPushConsumer(configs...)
- if err != nil {
- return nil, err
- }
- infoc(context.Background(), "[RocketMq-Consumer] init success, consumer running info: %#v", pushConsumer.GetConsumerRunningInfo())
- return &rocketMqConsumer{
- pushConsumer: pushConsumer,
- isStarted: false,
- }, nil
- }
- // 启动
- func (c *rocketMqConsumer) start() error {
- if err := c.pushConsumer.Start(); err != nil {
- return err
- }
- c.isStarted = true
- return nil
- }
- // 获取consumer
- func (c *rocketMqConsumer) GetRocketMqConsumer() rocketmq.PushConsumer {
- return c.pushConsumer
- }
- // 订阅(如果启动后再去订阅则无法订阅,只允许启动前设置)
- func (c *rocketMqConsumer) Subscribe(topic string, handler func(ctx context.Context, message *primitive.MessageExt)) error {
- return c.SubscribeWithMessageSelector(topic, consumer.MessageSelector{}, handler)
- }
- // 订阅(如果启动后再去订阅则无法订阅,只允许启动前设置)
- func (c *rocketMqConsumer) SubscribeWithMessageSelector(topic string, selector consumer.MessageSelector, handler func(ctx context.Context, message *primitive.MessageExt)) error {
- if c.isStarted == true {
- return consumerStartedError
- }
- if topic == "" || handler == nil {
- return errors.New("rocket-mq consumer subscribe find topic is empty or handler is nil")
- }
- if err := c.pushConsumer.Subscribe(topic, selector, func(ctx context.Context,
- messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
- for index, _ := range messages {
- func(msg *primitive.MessageExt) {
- defer func() {
- if err := recover(); err != nil {
- errorc(ctx, "[RocketMq-Consumer] end find panic, err: %v", err)
- }
- }()
- start := time.Now()
- traceId := GetMessageTraceId(msg)
- if traceId == "" {
- traceId = logger.GenerateTraceId()
- }
- ctx := context.WithValue(ctx, loggerTraceIdKey, traceId)
- // message_id 为唯一的数据
- infoc(ctx, "[RocketMq-Consumer] start, msg_id: %s, topic: %s, queue_id: %d, offset: %d, body: %s, tag: %s", msg.MsgId, msg.Topic, msg.Queue.QueueId, msg.QueueOffset, msg.Body, msg.GetTags())
- handler(ctx, msg)
- infoc(ctx, "[RocketMq-Consumer] end success, spend: %dms", time.Now().Sub(start).Milliseconds())
- }(messages[index])
- }
- return consumer.ConsumeSuccess, nil
- }); err != nil {
- errorc(context.Background(), "[RocketMq-Consumer] subscribe topic err, topic: %s, err: %v", topic, err)
- return err
- }
- infoc(context.Background(), "[RocketMq-Consumer] subscribe topic success, topic: %s, selector: %#v", topic, selector)
- return nil
- }
- // 订阅(如果启动后再去订阅则无法订阅,只允许启动前设置)
- func (c *rocketMqConsumer) SubscribeWithTagSelector(topic string, handler func(ctx context.Context, message *primitive.MessageExt), tag ...string) error {
- if tag == nil || len(tag) == 0 {
- return errors.New("rocket mq can not select tag with nil tag, please check invoke SubscribeWithTagSelector func")
- }
- return c.SubscribeWithMessageSelector(topic, consumer.MessageSelector{
- Type: consumer.TAG,
- Expression: util.JoinIgnoreSpace(tag, " || "), // eg: "TagA || TagC" 表示可以订阅 TagA或者TagC , "*"表示订阅全部
- }, handler)
- }
|