rocket_mq_test.go 7.2 KB


  1. package rocketmq
  2. import (
  3. "context"
  4. "github.com/apache/rocketmq-client-go/v2/consumer"
  5. "github.com/apache/rocketmq-client-go/v2/primitive"
  6. "github.com/stretchr/testify/assert"
  7. "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
  8. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/properties"
  9. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
  10. "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
  11. "git.shuncheng.lu/bigthing/gocommon/pkg/trace"
  12. "sync"
  13. "testing"
  14. "time"
  15. )
  16. // 测试使用的集群:https://github.com/Anthony-Dong/docker-rocketmq-cluster
  17. var (
  18. testTopicName = "test_topic"
  19. tag = []string{"tag-1", "tag-2", "tag-3"}
  20. config = `
  21. rocket_mq.host = 127.0.0.1:9871,127.0.0.1:9872
  22. rocket_mq.consumer.group_name = go_common_consumer
  23. rocket_mq.producer.group_name = go_common_producer
  24. rocket_mq.access_key = rmq_access_key
  25. rocket_mq.secret_key = rmq_secret_key
  26. rocket_mq.skywalking.enable = true
  27. rocket_mq.trace.enable = true
  28. rocket_mq.log.filename = /data/log/go-common/rocket-mq.log
  29. rocket_mq.log.level = error
  30. # skywalking
  31. trace.sky_walking_host = 10.100.72.97:11800
  32. trace.application_name = go-common_dev
  33. `
  34. traceInit sync.Once
  35. )
  36. func mockConfig(t testing.TB, config string) {
  37. pro, err := properties.ReadFromString(config)
  38. if err != nil {
  39. t.Fatal(err)
  40. }
  41. conf.MustValue = func(section, key string, defaultVal ...string) string {
  42. return pro.GetString(section+"."+key, defaultVal...)
  43. }
  44. infoc = func(ctx context.Context, format string, v ...interface{}) {
  45. util.Infof("[trace_id="+logger.GetTraceId(ctx)+"] "+format, v...)
  46. }
  47. errorc = func(ctx context.Context, format string, v ...interface{}) {
  48. util.Errorf("[trace_id="+logger.GetTraceId(ctx)+"] "+format, v)
  49. }
  50. // 如果配置有,则初始化
  51. if pro.GetBool("rocket_mq.skywalking.enable") && pro.GetString("trace.sky_walking_host") != "" && pro.GetString("trace.application_name") != "" {
  52. traceInit.Do(func() {
  53. if err := trace.Init(); err != nil {
  54. t.Fatal(err)
  55. }
  56. })
  57. }
  58. }
  59. func TestConfig(t *testing.T) {
  60. mockConfig(t, config)
  61. config, err := GetRocketMQConfig(ConsumerMQ, conf.MustValue)
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. assert.Equal(t, config.Host, []string{"127.0.0.1:9871", "127.0.0.1:9872"})
  66. assert.Equal(t, config.ConsumerGroupName, "go_common_consumer")
  67. assert.Equal(t, config.ProducerGroupName, "go_common_producer")
  68. assert.Equal(t, config.credentials, primitive.Credentials{AccessKey: "rmq_access_key", SecretKey: "rmq_secret_key"})
  69. assert.Equal(t, config.SkyWalkingEnable, true)
  70. assert.Equal(t, config.TraceEnable, true)
  71. assert.Equal(t, config.MQType, ConsumerMQ)
  72. }
  73. /**
  74. // 无异常
  75. // go test -run=none -bench=BenchmarkProducer -benchmem ./pkg/mq/rocketmq
  76. goos: darwin
  77. goarch: amd64
  78. pkg: git.shuncheng.lu/bigthing/gocommon/pkg/mq/rocketmq
  79. BenchmarkProducer-12 272 4584084 ns/op 29459 B/op 311 allocs/op
  80. PASS
  81. ok git.shuncheng.lu/bigthing/gocommon/pkg/mq/rocketmq 2.143s
  82. */
  83. func BenchmarkProducer(b *testing.B) {
  84. mockConfig(b, config)
  85. if err := InitProducer(); err != nil {
  86. b.Fatal(err)
  87. }
  88. for i := 0; i < b.N; i++ {
  89. func() {
  90. ctx, span := trace.MockContext(context.Background(), "SendMessageV2")
  91. defer span.End()
  92. ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
  93. if _, err := GetProducer().SendMessageV2(ctx, primitive.NewMessage(testTopicName, []byte("SendMessageV2"))); err != nil {
  94. b.Fatal(err)
  95. }
  96. }()
  97. }
  98. }
  99. // producer test
  100. func TestProducerApi(t *testing.T) {
  101. mockConfig(t, config)
  102. if err := InitProducer(); err != nil {
  103. t.Fatal(err)
  104. }
  105. {
  106. // 业务中一般都有middleware 提供span,所以这里不需要
  107. ctx, span := trace.MockContext(context.Background(), "Send")
  108. ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
  109. if err := GetProducer().Send(ctx, testTopicName, []byte("Send")); err != nil {
  110. t.Fatal(err)
  111. }
  112. span.End()
  113. }
  114. {
  115. ctx, span := trace.MockContext(context.Background(), "SendV2")
  116. ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
  117. if _, err := GetProducer().SendV2(ctx, testTopicName, []byte("SendV2")); err != nil {
  118. t.Fatal(err)
  119. }
  120. span.End()
  121. }
  122. {
  123. ctx, span := trace.MockContext(context.Background(), "SendMessage")
  124. ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
  125. if err := GetProducer().SendMessage(ctx, primitive.NewMessage(testTopicName, []byte("SendMessage"))); err != nil {
  126. t.Fatal(err)
  127. }
  128. span.End()
  129. }
  130. {
  131. ctx, span := trace.MockContext(context.Background(), "SendMessageV2")
  132. ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
  133. if _, err := GetProducer().SendMessageV2(ctx, primitive.NewMessage(testTopicName, []byte("SendMessageV2"))); err != nil {
  134. t.Fatal(err)
  135. }
  136. span.End()
  137. }
  138. {
  139. ctx, span := trace.MockContext(context.Background(), "SendWithTag")
  140. ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
  141. if _, err := GetProducer().SendWithTag(ctx, testTopicName, []byte("SendWithTag"), tag[0]); err != nil {
  142. t.Fatal(err)
  143. }
  144. span.End()
  145. }
  146. {
  147. ctx, span := trace.MockContext(context.Background(), "SendWithTag")
  148. ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
  149. if _, err := GetProducer().SendWithTag(ctx, testTopicName, []byte("SendWithTag"), tag[1]); err != nil {
  150. t.Fatal(err)
  151. }
  152. span.End()
  153. }
  154. {
  155. ctx, span := trace.MockContext(context.Background(), "SendWithTag")
  156. ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
  157. if _, err := GetProducer().SendWithTag(ctx, testTopicName, []byte("SendWithTag"), tag[2]); err != nil {
  158. t.Fatal(err)
  159. }
  160. span.End()
  161. }
  162. {
  163. ctx, span := trace.MockContext(context.Background(), "SendSyncMulti")
  164. ctx = context.WithValue(ctx, logger.TraceIdKey, trace.GetTraceId(ctx))
  165. if _, err := GetProducer().GetRocketMQProducer().SendSync(ctx, primitive.NewMessage(testTopicName, []byte("SendSyncMulti-1")), primitive.NewMessage(testTopicName, []byte("SendSyncMulti-2"))); err != nil {
  166. t.Fatal(err)
  167. }
  168. span.End()
  169. }
  170. }
  171. func TestConsumerApi(t *testing.T) {
  172. mockConfig(t, config)
  173. //TestProducerApi(t)
  174. if err := InitConsumer(); err != nil {
  175. t.Fatal(err)
  176. }
  177. if err := GetConsumer().Subscribe(testTopicName, func(ctx context.Context, message *primitive.MessageExt) {
  178. //return nil
  179. }); err != nil {
  180. t.Fatal(err)
  181. }
  182. if err := StartConsumer(); err != nil {
  183. t.Fatal(err)
  184. }
  185. time.Sleep(time.Second * 5)
  186. }
  187. func TestConsumerMessageSelector(t *testing.T) {
  188. mockConfig(t, config)
  189. TestProducerApi(t)
  190. if err := InitConsumer(); err != nil {
  191. t.Fatal(err)
  192. }
  193. if err := GetConsumer().SubscribeWithMessageSelector(testTopicName, consumer.MessageSelector{Type: consumer.TAG, Expression: tag[0]}, func(ctx context.Context, message *primitive.MessageExt) {
  194. //return nil
  195. }); err != nil {
  196. t.Fatal(err)
  197. }
  198. if err := StartConsumer(); err != nil {
  199. t.Fatal(err)
  200. }
  201. time.Sleep(time.Second * 5)
  202. }
  203. func TestSubscribeWithTagSelector(t *testing.T) {
  204. mockConfig(t, config)
  205. TestProducerApi(t)
  206. if err := InitConsumer(); err != nil {
  207. t.Fatal(err)
  208. }
  209. if err := GetConsumer().SubscribeWithTagSelector(testTopicName, func(ctx context.Context, message *primitive.MessageExt) {
  210. //return nil
  211. }, tag[0], tag[1]); err != nil {
  212. t.Fatal(err)
  213. }
  214. if err := StartConsumer(); err != nil {
  215. t.Fatal(err)
  216. }
  217. time.Sleep(time.Second * 10)
  218. }