trace.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package rocketmq
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
  7. "git.shuncheng.lu/bigthing/gocommon/pkg/trace"
  8. "github.com/SkyAPM/go2sky"
  9. "github.com/SkyAPM/go2sky/propagation"
  10. "github.com/apache/rocketmq-client-go/v2/primitive"
  11. )
  12. /**
  13. skywalking rocket-mq trace效果:
  14. 整体图:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/d1abf52d32e1476b9ca597d3bbc1d2a8.png
  15. 生产者图:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/dd1a106c07494b619d84c41b7aa5585a.png
  16. 消费者图:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/2378d0f6a726441097da2b578c9f607c.png
  17. */
  18. const (
  19. rocketMQProducer = 38
  20. rocketMQConsumer = 39
  21. )
  22. const (
  23. peerName = "No peer"
  24. language = "Go"
  25. timeFormat = "2006-01-02 15:04:05"
  26. )
  27. // trace-config
  28. var (
  29. newSkyWalkingProducerExitSpan = func(ctx context.Context, msg *primitive.Message) go2sky.Span {
  30. span, err := trace.GinNewExitSpan(ctx, "RocketMQ/Producer/"+msg.Topic, peerName, func(header string) error {
  31. msg.WithProperty(propagation.Header, header)
  32. return nil
  33. })
  34. if span == nil || err != nil {
  35. return nil
  36. }
  37. return span
  38. }
  39. newSkyWalkingConsumerEntrySpan = func(ctx context.Context, msg *primitive.MessageExt) (go2sky.Span, context.Context) {
  40. span, ctx, err := trace.GinNewEntrySpan(ctx, "RocketMQ/Consumer/"+msg.Topic, func() (s string, e error) {
  41. return msg.GetProperty(propagation.Header), nil
  42. })
  43. if span == nil || err != nil {
  44. return nil, ctx
  45. }
  46. return span, ctx
  47. }
  48. )
  49. func SetNewSkyWalkingConsumerEntrySpan(foo func(ctx context.Context, msg *primitive.MessageExt) (go2sky.Span, context.Context)) {
  50. newSkyWalkingConsumerEntrySpan = foo
  51. }
  52. func SetNewSkyWalkingProducerExitSpan(foo func(ctx context.Context, msg *primitive.Message) go2sky.Span) {
  53. newSkyWalkingProducerExitSpan = foo
  54. }
  55. func milliseconds2String(milliseconds int64) string {
  56. return time.Unix(milliseconds/1000, 0).Format(timeFormat)
  57. }
  58. func GetMessageTraceId(message *primitive.MessageExt) string {
  59. if message == nil {
  60. return ""
  61. }
  62. refSc := &propagation.SpanContext{}
  63. _ = refSc.DecodeSW8(message.GetProperty(propagation.Header))
  64. return refSc.TraceID
  65. }
  66. func getSkyWalkingConsumerTraceInterceptor() func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
  67. return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
  68. var (
  69. spans []go2sky.Span = nil
  70. )
  71. messages, isOk := req.([]*primitive.MessageExt)
  72. if isOk && newSkyWalkingConsumerEntrySpan != nil {
  73. spans = make([]go2sky.Span, 0, len(messages))
  74. for index, _ := range messages {
  75. span, _ := newRocketMQConsumerSpan(ctx, messages[index])
  76. if span != nil {
  77. spans = append(spans, span)
  78. }
  79. }
  80. }
  81. if spans != nil {
  82. defer func() {
  83. for _, elem := range spans {
  84. if elem != nil {
  85. elem.End()
  86. }
  87. }
  88. }()
  89. }
  90. return next(ctx, req, reply)
  91. }
  92. }
  93. func newRocketMQConsumerSpan(ctx context.Context, message *primitive.MessageExt) (go2sky.Span, context.Context) {
  94. span, ctx := newSkyWalkingConsumerEntrySpan(ctx, message)
  95. if span == nil {
  96. return nil, ctx
  97. }
  98. span.SetComponent(rocketMQConsumer)
  99. span.Tag("mq.topic", message.Topic)
  100. span.Tag("mq.msg_id", message.MsgId)
  101. span.Tag("consumer.language", language)
  102. span.Tag("producer.time", milliseconds2String(message.StoreTimestamp))
  103. span.Tag("consumer.time", time.Now().Format(timeFormat))
  104. span.Tag("consumer.hostname", util.CurrentServerHostName())
  105. return span, ctx
  106. }
  107. func getSkyWalkingProducerTraceInterceptor() func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
  108. return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
  109. var (
  110. span go2sky.Span
  111. )
  112. // 消息
  113. msg, isOk := req.(*primitive.Message)
  114. if isOk && newSkyWalkingProducerExitSpan != nil {
  115. span = newSkyWalkingProducerExitSpan(ctx, msg)
  116. }
  117. // end
  118. if span != nil {
  119. defer span.End()
  120. }
  121. // invoke
  122. if err := next(ctx, req, reply); err != nil {
  123. if span != nil {
  124. span.Error(time.Now(), fmt.Sprintf("%v", err))
  125. }
  126. return err
  127. }
  128. // handler result
  129. result, isOk := reply.(*primitive.SendResult)
  130. if isOk && span != nil {
  131. span.SetComponent(rocketMQProducer)
  132. span.Tag("mq.topic", msg.Topic)
  133. if tags := msg.GetTags(); tags != "" {
  134. span.Tag("mq.tags", tags)
  135. }
  136. span.Tag("mq.msg_id", result.MsgID)
  137. span.Tag("mq.broker.name", result.MessageQueue.BrokerName)
  138. span.Tag("mq.queue", util.ToString(result.MessageQueue.QueueId))
  139. span.Tag("mq.offset", util.ToString(result.QueueOffset))
  140. span.Tag("mq.status", util.ToString(result.Status))
  141. span.Tag("mq.trace", util.ToString(result.TraceOn))
  142. span.Tag("producer.language", language)
  143. span.Tag("producer.time", time.Now().Format(timeFormat))
  144. span.Tag("producer.hostname", util.CurrentServerHostName())
  145. }
  146. return nil
  147. }
  148. }