| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package rocketmq
- import (
- "context"
- "errors"
- "github.com/apache/rocketmq-client-go/v2"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- "github.com/apache/rocketmq-client-go/v2/producer"
- )
- type Producer interface {
- // 定制化生产者使用(对于事务消息,目前Go的Sdk不成熟)
- GetRocketMQProducer() rocketmq.Producer
- // 发送消息
- Send(ctx context.Context, topic string, message []byte) error
- SendV2(ctx context.Context, topic string, message []byte) (*primitive.SendResult, error)
- // 发送消息(由于批量消息有特殊的加工,所以目前API不直接提供批量消息的封装,原因如下:)
- // 1、发送批量消息是拿不到message-id的
- // 2、发送批量消息interceptor 的req会是一个Batch的message,具体可以看 github.com/apache/rocketmq-client-go/v2@v2.0.0/producer/producer.go:114
- SendMessage(ctx context.Context, message *primitive.Message) error
- // 获取消息结果(结果可以获取偏移量)
- SendMessageV2(ctx context.Context, message *primitive.Message) (*primitive.SendResult, error)
- // 发送消息携带tag
- SendWithTag(ctx context.Context, topic string, message []byte, tag string) (*primitive.SendResult, error)
- }
- type rocketMqProducer struct {
- producer rocketmq.Producer
- }
- // 顺便启动
- func newProducer(configs ...producer.Option) (*rocketMqProducer, error) {
- pro, err := rocketmq.NewProducer(configs...)
- if err != nil {
- return nil, err
- }
- infoc(context.Background(), "[RocketMq-Producer] init success, config: %#v", pro)
- // start
- if err := pro.Start(); err != nil {
- errorc(context.Background(), "[RocketMq-Producer] init err, err: %v", err)
- return nil, err
- }
- infoc(context.Background(), "[RocketMq-Producer] start success !")
- return &rocketMqProducer{producer: pro}, nil
- }
- // 获取producer
- func (p *rocketMqProducer) GetRocketMQProducer() rocketmq.Producer {
- return p.producer
- }
- // @after
- // 发送消息 (只允许启动后发送)
- func (p *rocketMqProducer) SendMessage(ctx context.Context, message *primitive.Message) error {
- if _, err := p.SendMessageResult(ctx, message); err != nil {
- return err
- }
- return nil
- }
- // @after
- // 发送消息 (只允许启动后发送)
- func (p *rocketMqProducer) Send(ctx context.Context, topic string, message []byte) error {
- return p.SendMessage(ctx, primitive.NewMessage(topic, message))
- }
- // @after
- // 发送消息+结果 (只允许启动后发送)
- func (p *rocketMqProducer) SendV2(ctx context.Context, topic string, message []byte) (*primitive.SendResult, error) {
- return p.SendMessageResult(ctx, primitive.NewMessage(topic, message))
- }
- // @after
- // 发送消息+结果 (只允许启动后发送)
- func (p *rocketMqProducer) SendWithTag(ctx context.Context, topic string, message []byte, tag string) (*primitive.SendResult, error) {
- msg := primitive.NewMessage(topic, message)
- msg = msg.WithTag(tag)
- return p.SendMessageResult(ctx, msg)
- }
- // @after
- // 发送消息+结果 (只允许启动后发送)
- func (p *rocketMqProducer) SendMessageV2(ctx context.Context, message *primitive.Message) (*primitive.SendResult, error) {
- return p.SendMessageResult(ctx, message)
- }
- // @after
- // 发送消息+结果 (只允许启动后发送)
- func (p *rocketMqProducer) SendMessageResult(ctx context.Context, message *primitive.Message) (*primitive.SendResult, error) {
- if message == nil {
- return nil, errors.New("the rocket mq sent message is nil")
- }
- infoc(ctx, "[RocketMq-Producer] sync send msg, message: %s", message)
- result, err := p.producer.SendSync(ctx, message)
- if err != nil {
- errorc(ctx, "[RocketMq-Producer] sync send err, err: %v", err)
- return nil, err
- }
- infoc(ctx, "[RocketMq-Producer] sync send msg success, queue: %d, offset: %d, msg_id: %s", result.MessageQueue.QueueId, result.QueueOffset, result.MsgID)
- return result, nil
- }
|