package rocketmq import ( "context" "errors" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) type Producer interface { // 定制化生产者使用(对于事务消息,目前Go的Sdk不成熟) GetRocketMQProducer() rocketmq.Producer // 发送消息 Send(ctx context.Context, topic string, message []byte) error SendV2(ctx context.Context, topic string, message []byte) (*primitive.SendResult, error) // 发送消息(由于批量消息有特殊的加工,所以目前API不直接提供批量消息的封装,原因如下:) // 1、发送批量消息是拿不到message-id的 // 2、发送批量消息interceptor 的req会是一个Batch的message,具体可以看 github.com/apache/rocketmq-client-go/v2@v2.0.0/producer/producer.go:114 SendMessage(ctx context.Context, message *primitive.Message) error // 获取消息结果(结果可以获取偏移量) SendMessageV2(ctx context.Context, message *primitive.Message) (*primitive.SendResult, error) // 发送消息携带tag SendWithTag(ctx context.Context, topic string, message []byte, tag string) (*primitive.SendResult, error) } type rocketMqProducer struct { producer rocketmq.Producer } // 顺便启动 func newProducer(configs ...producer.Option) (*rocketMqProducer, error) { pro, err := rocketmq.NewProducer(configs...) if err != nil { return nil, err } infoc(context.Background(), "[RocketMq-Producer] init success, config: %#v", pro) // start if err := pro.Start(); err != nil { errorc(context.Background(), "[RocketMq-Producer] init err, err: %v", err) return nil, err } infoc(context.Background(), "[RocketMq-Producer] start success !") return &rocketMqProducer{producer: pro}, nil } // 获取producer func (p *rocketMqProducer) GetRocketMQProducer() rocketmq.Producer { return p.producer } // @after // 发送消息 (只允许启动后发送) func (p *rocketMqProducer) SendMessage(ctx context.Context, message *primitive.Message) error { if _, err := p.SendMessageResult(ctx, message); err != nil { return err } return nil } // @after // 发送消息 (只允许启动后发送) func (p *rocketMqProducer) Send(ctx context.Context, topic string, message []byte) error { return p.SendMessage(ctx, primitive.NewMessage(topic, message)) } // @after // 发送消息+结果 (只允许启动后发送) func (p *rocketMqProducer) SendV2(ctx context.Context, topic string, message []byte) (*primitive.SendResult, error) { return p.SendMessageResult(ctx, primitive.NewMessage(topic, message)) } // @after // 发送消息+结果 (只允许启动后发送) func (p *rocketMqProducer) SendWithTag(ctx context.Context, topic string, message []byte, tag string) (*primitive.SendResult, error) { msg := primitive.NewMessage(topic, message) msg = msg.WithTag(tag) return p.SendMessageResult(ctx, msg) } // @after // 发送消息+结果 (只允许启动后发送) func (p *rocketMqProducer) SendMessageV2(ctx context.Context, message *primitive.Message) (*primitive.SendResult, error) { return p.SendMessageResult(ctx, message) } // @after // 发送消息+结果 (只允许启动后发送) func (p *rocketMqProducer) SendMessageResult(ctx context.Context, message *primitive.Message) (*primitive.SendResult, error) { if message == nil { return nil, errors.New("the rocket mq sent message is nil") } infoc(ctx, "[RocketMq-Producer] sync send msg, message: %s", message) result, err := p.producer.SendSync(ctx, message) if err != nil { errorc(ctx, "[RocketMq-Producer] sync send err, err: %v", err) return nil, err } infoc(ctx, "[RocketMq-Producer] sync send msg success, queue: %d, offset: %d, msg_id: %s", result.MessageQueue.QueueId, result.QueueOffset, result.MsgID) return result, nil }