package rocketmq import ( "context" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/stretchr/testify/assert" "git.shuncheng.lu/bigthing/gocommon/pkg/conf" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/properties" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util" "git.shuncheng.lu/bigthing/gocommon/pkg/logger" "git.shuncheng.lu/bigthing/gocommon/pkg/trace" "sync" "testing" "time" ) // 测试使用的集群:https://github.com/Anthony-Dong/docker-rocketmq-cluster var ( testTopicName = "test_topic" tag = []string{"tag-1", "tag-2", "tag-3"} config = ` rocket_mq.host = 127.0.0.1:9871,127.0.0.1:9872 rocket_mq.consumer.group_name = go_common_consumer rocket_mq.producer.group_name = go_common_producer rocket_mq.access_key = rmq_access_key rocket_mq.secret_key = rmq_secret_key rocket_mq.skywalking.enable = true rocket_mq.trace.enable = true rocket_mq.log.filename = /data/log/go-common/rocket-mq.log rocket_mq.log.level = error # skywalking trace.sky_walking_host = 10.100.72.97:11800 trace.application_name = go-common_dev ` traceInit sync.Once ) func mockConfig(t testing.TB, config string) { pro, err := properties.ReadFromString(config) if err != nil { t.Fatal(err) } conf.MustValue = func(section, key string, defaultVal ...string) string { return pro.GetString(section+"."+key, defaultVal...) } infoc = func(ctx context.Context, format string, v ...interface{}) { util.Infof("[trace_id="+logger.GetTraceId(ctx)+"] "+format, v...) } errorc = func(ctx context.Context, format string, v ...interface{}) { util.Errorf("[trace_id="+logger.GetTraceId(ctx)+"] "+format, v) } // 如果配置有,则初始化 if pro.GetBool("rocket_mq.skywalking.enable") && pro.GetString("trace.sky_walking_host") != "" && pro.GetString("trace.application_name") != "" { traceInit.Do(func() { if err := trace.Init(); err != nil { t.Fatal(err) } }) } } func TestConfig(t *testing.T) { mockConfig(t, config) config, err := GetRocketMQConfig(ConsumerMQ, conf.MustValue) if err != nil { t.Fatal(err) } assert.Equal(t, config.Host, []string{"127.0.0.1:9871", "127.0.0.1:9872"}) assert.Equal(t, config.ConsumerGroupName, "go_common_consumer") assert.Equal(t, config.ProducerGroupName, "go_common_producer") assert.Equal(t, config.credentials, primitive.Credentials{AccessKey: "rmq_access_key", SecretKey: "rmq_secret_key"}) assert.Equal(t, config.SkyWalkingEnable, true) assert.Equal(t, config.TraceEnable, true) assert.Equal(t, config.MQType, ConsumerMQ) } /** // 无异常 // go test -run=none -bench=BenchmarkProducer -benchmem ./pkg/mq/rocketmq goos: darwin goarch: amd64 pkg: git.shuncheng.lu/bigthing/gocommon/pkg/mq/rocketmq BenchmarkProducer-12 272 4584084 ns/op 29459 B/op 311 allocs/op PASS ok git.shuncheng.lu/bigthing/gocommon/pkg/mq/rocketmq 2.143s */ func BenchmarkProducer(b *testing.B) { mockConfig(b, config) if err := InitProducer(); err != nil { b.Fatal(err) } for i := 0; i < b.N; i++ { func() { ctx, span := trace.MockContext(context.Background(), "SendMessageV2") defer span.End() ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx)) if _, err := GetProducer().SendMessageV2(ctx, primitive.NewMessage(testTopicName, []byte("SendMessageV2"))); err != nil { b.Fatal(err) } }() } } // producer test func TestProducerApi(t *testing.T) { mockConfig(t, config) if err := InitProducer(); err != nil { t.Fatal(err) } { // 业务中一般都有middleware 提供span,所以这里不需要 ctx, span := trace.MockContext(context.Background(), "Send") ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx)) if err := GetProducer().Send(ctx, testTopicName, []byte("Send")); err != nil { t.Fatal(err) } span.End() } { ctx, span := trace.MockContext(context.Background(), "SendV2") ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx)) if _, err := GetProducer().SendV2(ctx, testTopicName, []byte("SendV2")); err != nil { t.Fatal(err) } span.End() } { ctx, span := trace.MockContext(context.Background(), "SendMessage") ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx)) if err := GetProducer().SendMessage(ctx, primitive.NewMessage(testTopicName, []byte("SendMessage"))); err != nil { t.Fatal(err) } span.End() } { ctx, span := trace.MockContext(context.Background(), "SendMessageV2") ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx)) if _, err := GetProducer().SendMessageV2(ctx, primitive.NewMessage(testTopicName, []byte("SendMessageV2"))); err != nil { t.Fatal(err) } span.End() } { ctx, span := trace.MockContext(context.Background(), "SendWithTag") ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx)) if _, err := GetProducer().SendWithTag(ctx, testTopicName, []byte("SendWithTag"), tag[0]); err != nil { t.Fatal(err) } span.End() } { ctx, span := trace.MockContext(context.Background(), "SendWithTag") ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx)) if _, err := GetProducer().SendWithTag(ctx, testTopicName, []byte("SendWithTag"), tag[1]); err != nil { t.Fatal(err) } span.End() } { ctx, span := trace.MockContext(context.Background(), "SendWithTag") ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx)) if _, err := GetProducer().SendWithTag(ctx, testTopicName, []byte("SendWithTag"), tag[2]); err != nil { t.Fatal(err) } span.End() } { ctx, span := trace.MockContext(context.Background(), "SendSyncMulti") ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx)) if _, err := GetProducer().GetRocketMQProducer().SendSync(ctx, primitive.NewMessage(testTopicName, []byte("SendSyncMulti-1")), primitive.NewMessage(testTopicName, []byte("SendSyncMulti-2"))); err != nil { t.Fatal(err) } span.End() } } func TestConsumerApi(t *testing.T) { mockConfig(t, config) //TestProducerApi(t) if err := InitConsumer(); err != nil { t.Fatal(err) } if err := GetConsumer().Subscribe(testTopicName, func(ctx context.Context, message *primitive.MessageExt) { //return nil }); err != nil { t.Fatal(err) } if err := StartConsumer(); err != nil { t.Fatal(err) } time.Sleep(time.Second * 5) } func TestConsumerMessageSelector(t *testing.T) { mockConfig(t, config) TestProducerApi(t) if err := InitConsumer(); err != nil { t.Fatal(err) } if err := GetConsumer().SubscribeWithMessageSelector(testTopicName, consumer.MessageSelector{Type: consumer.TAG, Expression: tag[0]}, func(ctx context.Context, message *primitive.MessageExt) { //return nil }); err != nil { t.Fatal(err) } if err := StartConsumer(); err != nil { t.Fatal(err) } time.Sleep(time.Second * 5) } func TestSubscribeWithTagSelector(t *testing.T) { mockConfig(t, config) TestProducerApi(t) if err := InitConsumer(); err != nil { t.Fatal(err) } if err := GetConsumer().SubscribeWithTagSelector(testTopicName, func(ctx context.Context, message *primitive.MessageExt) { //return nil }, tag[0], tag[1]); err != nil { t.Fatal(err) } if err := StartConsumer(); err != nil { t.Fatal(err) } time.Sleep(time.Second * 10) }