producer_api.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package rocketmq
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/apache/rocketmq-client-go/v2"
  6. "github.com/apache/rocketmq-client-go/v2/primitive"
  7. "github.com/apache/rocketmq-client-go/v2/producer"
  8. )
  9. type Producer interface {
  10. // 定制化生产者使用(对于事务消息,目前Go的Sdk不成熟)
  11. GetRocketMQProducer() rocketmq.Producer
  12. // 发送消息
  13. Send(ctx context.Context, topic string, message []byte) error
  14. SendV2(ctx context.Context, topic string, message []byte) (*primitive.SendResult, error)
  15. // 发送消息(由于批量消息有特殊的加工,所以目前API不直接提供批量消息的封装,原因如下:)
  16. // 1、发送批量消息是拿不到message-id的
  17. // 2、发送批量消息interceptor 的req会是一个Batch的message,具体可以看 github.com/apache/rocketmq-client-go/v2@v2.0.0/producer/producer.go:114
  18. SendMessage(ctx context.Context, message *primitive.Message) error
  19. // 获取消息结果(结果可以获取偏移量)
  20. SendMessageV2(ctx context.Context, message *primitive.Message) (*primitive.SendResult, error)
  21. // 发送消息携带tag
  22. SendWithTag(ctx context.Context, topic string, message []byte, tag string) (*primitive.SendResult, error)
  23. }
  24. type rocketMqProducer struct {
  25. producer rocketmq.Producer
  26. }
  27. // 顺便启动
  28. func newProducer(configs ...producer.Option) (*rocketMqProducer, error) {
  29. pro, err := rocketmq.NewProducer(configs...)
  30. if err != nil {
  31. return nil, err
  32. }
  33. infoc(context.Background(), "[RocketMq-Producer] init success, config: %#v", pro)
  34. // start
  35. if err := pro.Start(); err != nil {
  36. errorc(context.Background(), "[RocketMq-Producer] init err, err: %v", err)
  37. return nil, err
  38. }
  39. infoc(context.Background(), "[RocketMq-Producer] start success !")
  40. return &rocketMqProducer{producer: pro}, nil
  41. }
  42. // 获取producer
  43. func (p *rocketMqProducer) GetRocketMQProducer() rocketmq.Producer {
  44. return p.producer
  45. }
  46. // @after
  47. // 发送消息 (只允许启动后发送)
  48. func (p *rocketMqProducer) SendMessage(ctx context.Context, message *primitive.Message) error {
  49. if _, err := p.SendMessageResult(ctx, message); err != nil {
  50. return err
  51. }
  52. return nil
  53. }
  54. // @after
  55. // 发送消息 (只允许启动后发送)
  56. func (p *rocketMqProducer) Send(ctx context.Context, topic string, message []byte) error {
  57. return p.SendMessage(ctx, primitive.NewMessage(topic, message))
  58. }
  59. // @after
  60. // 发送消息+结果 (只允许启动后发送)
  61. func (p *rocketMqProducer) SendV2(ctx context.Context, topic string, message []byte) (*primitive.SendResult, error) {
  62. return p.SendMessageResult(ctx, primitive.NewMessage(topic, message))
  63. }
  64. // @after
  65. // 发送消息+结果 (只允许启动后发送)
  66. func (p *rocketMqProducer) SendWithTag(ctx context.Context, topic string, message []byte, tag string) (*primitive.SendResult, error) {
  67. msg := primitive.NewMessage(topic, message)
  68. msg = msg.WithTag(tag)
  69. return p.SendMessageResult(ctx, msg)
  70. }
  71. // @after
  72. // 发送消息+结果 (只允许启动后发送)
  73. func (p *rocketMqProducer) SendMessageV2(ctx context.Context, message *primitive.Message) (*primitive.SendResult, error) {
  74. return p.SendMessageResult(ctx, message)
  75. }
  76. // @after
  77. // 发送消息+结果 (只允许启动后发送)
  78. func (p *rocketMqProducer) SendMessageResult(ctx context.Context, message *primitive.Message) (*primitive.SendResult, error) {
  79. if message == nil {
  80. return nil, errors.New("the rocket mq sent message is nil")
  81. }
  82. infoc(ctx, "[RocketMq-Producer] sync send msg, message: %s", message)
  83. result, err := p.producer.SendSync(ctx, message)
  84. if err != nil {
  85. errorc(ctx, "[RocketMq-Producer] sync send err, err: %v", err)
  86. return nil, err
  87. }
  88. infoc(ctx, "[RocketMq-Producer] sync send msg success, queue: %d, offset: %d, msg_id: %s", result.MessageQueue.QueueId, result.QueueOffset, result.MsgID)
  89. return result, nil
  90. }