consumer_api.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package rocketmq
  2. import (
  3. "context"
  4. "errors"
  5. "time"
  6. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
  7. "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
  8. "github.com/apache/rocketmq-client-go/v2"
  9. "github.com/apache/rocketmq-client-go/v2/consumer"
  10. "github.com/apache/rocketmq-client-go/v2/primitive"
  11. )
  12. type Consumer interface {
  13. // 如果需要定制化开发需要使用这个
  14. GetRocketMqConsumer() rocketmq.PushConsumer
  15. // 订阅/消费 指定的topic
  16. Subscribe(topic string, handler func(ctx context.Context, message *primitive.MessageExt)) error
  17. // 订阅/消费,可以设置一个选择器
  18. SubscribeWithMessageSelector(topic string, selector consumer.MessageSelector, handler func(ctx context.Context, message *primitive.MessageExt)) error
  19. // 订阅/消费,可以配置多个tag,表示订阅多个tag(注意如果为空,空抛出异常)
  20. SubscribeWithTagSelector(topic string, handler func(ctx context.Context, message *primitive.MessageExt), tag ...string) error
  21. }
  22. // Consumer-API
  23. type rocketMqConsumer struct {
  24. pushConsumer rocketmq.PushConsumer
  25. isStarted bool
  26. }
  27. func newConsumer(configs ...consumer.Option) (*rocketMqConsumer, error) {
  28. pushConsumer, err := consumer.NewPushConsumer(configs...)
  29. if err != nil {
  30. return nil, err
  31. }
  32. infoc(context.Background(), "[RocketMq-Consumer] init success, consumer running info: %#v", pushConsumer.GetConsumerRunningInfo())
  33. return &rocketMqConsumer{
  34. pushConsumer: pushConsumer,
  35. isStarted: false,
  36. }, nil
  37. }
  38. // 启动
  39. func (c *rocketMqConsumer) start() error {
  40. if err := c.pushConsumer.Start(); err != nil {
  41. return err
  42. }
  43. c.isStarted = true
  44. return nil
  45. }
  46. // 获取consumer
  47. func (c *rocketMqConsumer) GetRocketMqConsumer() rocketmq.PushConsumer {
  48. return c.pushConsumer
  49. }
  50. // 订阅(如果启动后再去订阅则无法订阅,只允许启动前设置)
  51. func (c *rocketMqConsumer) Subscribe(topic string, handler func(ctx context.Context, message *primitive.MessageExt)) error {
  52. return c.SubscribeWithMessageSelector(topic, consumer.MessageSelector{}, handler)
  53. }
  54. // 订阅(如果启动后再去订阅则无法订阅,只允许启动前设置)
  55. func (c *rocketMqConsumer) SubscribeWithMessageSelector(topic string, selector consumer.MessageSelector, handler func(ctx context.Context, message *primitive.MessageExt)) error {
  56. if c.isStarted == true {
  57. return consumerStartedError
  58. }
  59. if topic == "" || handler == nil {
  60. return errors.New("rocket-mq consumer subscribe find topic is empty or handler is nil")
  61. }
  62. if err := c.pushConsumer.Subscribe(topic, selector, func(ctx context.Context,
  63. messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
  64. for index, _ := range messages {
  65. func(msg *primitive.MessageExt) {
  66. defer func() {
  67. if err := recover(); err != nil {
  68. errorc(ctx, "[RocketMq-Consumer] end find panic, err: %v", err)
  69. }
  70. }()
  71. start := time.Now()
  72. traceId := GetMessageTraceId(msg)
  73. if traceId == "" {
  74. traceId = logger.GenerateTraceId()
  75. }
  76. ctx := context.WithValue(ctx, loggerTraceIdKey, traceId)
  77. // message_id 为唯一的数据
  78. infoc(ctx, "[RocketMq-Consumer] start, msg_id: %s, topic: %s, queue_id: %d, offset: %d, body: %s, tag: %s", msg.MsgId, msg.Topic, msg.Queue.QueueId, msg.QueueOffset, msg.Body, msg.GetTags())
  79. handler(ctx, msg)
  80. infoc(ctx, "[RocketMq-Consumer] end success, spend: %dms", time.Now().Sub(start).Milliseconds())
  81. }(messages[index])
  82. }
  83. return consumer.ConsumeSuccess, nil
  84. }); err != nil {
  85. errorc(context.Background(), "[RocketMq-Consumer] subscribe topic err, topic: %s, err: %v", topic, err)
  86. return err
  87. }
  88. infoc(context.Background(), "[RocketMq-Consumer] subscribe topic success, topic: %s, selector: %#v", topic, selector)
  89. return nil
  90. }
  91. // 订阅(如果启动后再去订阅则无法订阅,只允许启动前设置)
  92. func (c *rocketMqConsumer) SubscribeWithTagSelector(topic string, handler func(ctx context.Context, message *primitive.MessageExt), tag ...string) error {
  93. if tag == nil || len(tag) == 0 {
  94. return errors.New("rocket mq can not select tag with nil tag, please check invoke SubscribeWithTagSelector func")
  95. }
  96. return c.SubscribeWithMessageSelector(topic, consumer.MessageSelector{
  97. Type: consumer.TAG,
  98. Expression: util.JoinIgnoreSpace(tag, " || "), // eg: "TagA || TagC" 表示可以订阅 TagA或者TagC , "*"表示订阅全部
  99. }, handler)
  100. }