| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- package rocketmq
- import (
- "context"
- "github.com/apache/rocketmq-client-go/v2/consumer"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- "github.com/stretchr/testify/assert"
- "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/properties"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
- "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
- "git.shuncheng.lu/bigthing/gocommon/pkg/trace"
- "sync"
- "testing"
- "time"
- )
- // 测试使用的集群:https://github.com/Anthony-Dong/docker-rocketmq-cluster
- var (
- testTopicName = "test_topic"
- tag = []string{"tag-1", "tag-2", "tag-3"}
- config = `
- rocket_mq.host = 127.0.0.1:9871,127.0.0.1:9872
- rocket_mq.consumer.group_name = go_common_consumer
- rocket_mq.producer.group_name = go_common_producer
- rocket_mq.access_key = rmq_access_key
- rocket_mq.secret_key = rmq_secret_key
- rocket_mq.skywalking.enable = true
- rocket_mq.trace.enable = true
- rocket_mq.log.filename = /data/log/go-common/rocket-mq.log
- rocket_mq.log.level = error
- # skywalking
- trace.sky_walking_host = 10.100.72.97:11800
- trace.application_name = go-common_dev
- `
- traceInit sync.Once
- )
- func mockConfig(t testing.TB, config string) {
- pro, err := properties.ReadFromString(config)
- if err != nil {
- t.Fatal(err)
- }
- conf.MustValue = func(section, key string, defaultVal ...string) string {
- return pro.GetString(section+"."+key, defaultVal...)
- }
- infoc = func(ctx context.Context, format string, v ...interface{}) {
- util.Infof("[trace_id="+logger.GetTraceId(ctx)+"] "+format, v...)
- }
- errorc = func(ctx context.Context, format string, v ...interface{}) {
- util.Errorf("[trace_id="+logger.GetTraceId(ctx)+"] "+format, v)
- }
- // 如果配置有,则初始化
- if pro.GetBool("rocket_mq.skywalking.enable") && pro.GetString("trace.sky_walking_host") != "" && pro.GetString("trace.application_name") != "" {
- traceInit.Do(func() {
- if err := trace.Init(); err != nil {
- t.Fatal(err)
- }
- })
- }
- }
- func TestConfig(t *testing.T) {
- mockConfig(t, config)
- config, err := GetRocketMQConfig(ConsumerMQ, conf.MustValue)
- if err != nil {
- t.Fatal(err)
- }
- assert.Equal(t, config.Host, []string{"127.0.0.1:9871", "127.0.0.1:9872"})
- assert.Equal(t, config.ConsumerGroupName, "go_common_consumer")
- assert.Equal(t, config.ProducerGroupName, "go_common_producer")
- assert.Equal(t, config.credentials, primitive.Credentials{AccessKey: "rmq_access_key", SecretKey: "rmq_secret_key"})
- assert.Equal(t, config.SkyWalkingEnable, true)
- assert.Equal(t, config.TraceEnable, true)
- assert.Equal(t, config.MQType, ConsumerMQ)
- }
- /**
- // 无异常
- // go test -run=none -bench=BenchmarkProducer -benchmem ./pkg/mq/rocketmq
- goos: darwin
- goarch: amd64
- pkg: git.shuncheng.lu/bigthing/gocommon/pkg/mq/rocketmq
- BenchmarkProducer-12 272 4584084 ns/op 29459 B/op 311 allocs/op
- PASS
- ok git.shuncheng.lu/bigthing/gocommon/pkg/mq/rocketmq 2.143s
- */
- func BenchmarkProducer(b *testing.B) {
- mockConfig(b, config)
- if err := InitProducer(); err != nil {
- b.Fatal(err)
- }
- for i := 0; i < b.N; i++ {
- func() {
- ctx, span := trace.MockContext(context.Background(), "SendMessageV2")
- defer span.End()
- ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
- if _, err := GetProducer().SendMessageV2(ctx, primitive.NewMessage(testTopicName, []byte("SendMessageV2"))); err != nil {
- b.Fatal(err)
- }
- }()
- }
- }
- // producer test
- func TestProducerApi(t *testing.T) {
- mockConfig(t, config)
- if err := InitProducer(); err != nil {
- t.Fatal(err)
- }
- {
- // 业务中一般都有middleware 提供span,所以这里不需要
- ctx, span := trace.MockContext(context.Background(), "Send")
- ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
- if err := GetProducer().Send(ctx, testTopicName, []byte("Send")); err != nil {
- t.Fatal(err)
- }
- span.End()
- }
- {
- ctx, span := trace.MockContext(context.Background(), "SendV2")
- ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
- if _, err := GetProducer().SendV2(ctx, testTopicName, []byte("SendV2")); err != nil {
- t.Fatal(err)
- }
- span.End()
- }
- {
- ctx, span := trace.MockContext(context.Background(), "SendMessage")
- ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
- if err := GetProducer().SendMessage(ctx, primitive.NewMessage(testTopicName, []byte("SendMessage"))); err != nil {
- t.Fatal(err)
- }
- span.End()
- }
- {
- ctx, span := trace.MockContext(context.Background(), "SendMessageV2")
- ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
- if _, err := GetProducer().SendMessageV2(ctx, primitive.NewMessage(testTopicName, []byte("SendMessageV2"))); err != nil {
- t.Fatal(err)
- }
- span.End()
- }
- {
- ctx, span := trace.MockContext(context.Background(), "SendWithTag")
- ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
- if _, err := GetProducer().SendWithTag(ctx, testTopicName, []byte("SendWithTag"), tag[0]); err != nil {
- t.Fatal(err)
- }
- span.End()
- }
- {
- ctx, span := trace.MockContext(context.Background(), "SendWithTag")
- ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
- if _, err := GetProducer().SendWithTag(ctx, testTopicName, []byte("SendWithTag"), tag[1]); err != nil {
- t.Fatal(err)
- }
- span.End()
- }
- {
- ctx, span := trace.MockContext(context.Background(), "SendWithTag")
- ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
- if _, err := GetProducer().SendWithTag(ctx, testTopicName, []byte("SendWithTag"), tag[2]); err != nil {
- t.Fatal(err)
- }
- span.End()
- }
- {
- ctx, span := trace.MockContext(context.Background(), "SendSyncMulti")
- ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
- if _, err := GetProducer().GetRocketMQProducer().SendSync(ctx, primitive.NewMessage(testTopicName, []byte("SendSyncMulti-1")), primitive.NewMessage(testTopicName, []byte("SendSyncMulti-2"))); err != nil {
- t.Fatal(err)
- }
- span.End()
- }
- }
- func TestConsumerApi(t *testing.T) {
- mockConfig(t, config)
- //TestProducerApi(t)
- if err := InitConsumer(); err != nil {
- t.Fatal(err)
- }
- if err := GetConsumer().Subscribe(testTopicName, func(ctx context.Context, message *primitive.MessageExt) {
- //return nil
- }); err != nil {
- t.Fatal(err)
- }
- if err := StartConsumer(); err != nil {
- t.Fatal(err)
- }
- time.Sleep(time.Second * 5)
- }
- func TestConsumerMessageSelector(t *testing.T) {
- mockConfig(t, config)
- TestProducerApi(t)
- if err := InitConsumer(); err != nil {
- t.Fatal(err)
- }
- if err := GetConsumer().SubscribeWithMessageSelector(testTopicName, consumer.MessageSelector{Type: consumer.TAG, Expression: tag[0]}, func(ctx context.Context, message *primitive.MessageExt) {
- //return nil
- }); err != nil {
- t.Fatal(err)
- }
- if err := StartConsumer(); err != nil {
- t.Fatal(err)
- }
- time.Sleep(time.Second * 5)
- }
- func TestSubscribeWithTagSelector(t *testing.T) {
- mockConfig(t, config)
- TestProducerApi(t)
- if err := InitConsumer(); err != nil {
- t.Fatal(err)
- }
- if err := GetConsumer().SubscribeWithTagSelector(testTopicName, func(ctx context.Context, message *primitive.MessageExt) {
- //return nil
- }, tag[0], tag[1]); err != nil {
- t.Fatal(err)
- }
- if err := StartConsumer(); err != nil {
- t.Fatal(err)
- }
- time.Sleep(time.Second * 10)
- }
|