| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- package rocketmq
- import (
- "context"
- "fmt"
- "time"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
- "git.shuncheng.lu/bigthing/gocommon/pkg/trace"
- "github.com/SkyAPM/go2sky"
- "github.com/SkyAPM/go2sky/propagation"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- )
- /**
- skywalking rocket-mq trace效果:
- 整体图:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/d1abf52d32e1476b9ca597d3bbc1d2a8.png
- 生产者图:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/dd1a106c07494b619d84c41b7aa5585a.png
- 消费者图:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/2378d0f6a726441097da2b578c9f607c.png
- */
- const (
- rocketMQProducer = 38
- rocketMQConsumer = 39
- )
- const (
- peerName = "No peer"
- language = "Go"
- timeFormat = "2006-01-02 15:04:05"
- )
- // trace-config
- var (
- newSkyWalkingProducerExitSpan = func(ctx context.Context, msg *primitive.Message) go2sky.Span {
- span, err := trace.GinNewExitSpan(ctx, "RocketMQ/Producer/"+msg.Topic, peerName, func(header string) error {
- msg.WithProperty(propagation.Header, header)
- return nil
- })
- if span == nil || err != nil {
- return nil
- }
- return span
- }
- newSkyWalkingConsumerEntrySpan = func(ctx context.Context, msg *primitive.MessageExt) (go2sky.Span, context.Context) {
- span, ctx, err := trace.GinNewEntrySpan(ctx, "RocketMQ/Consumer/"+msg.Topic, func() (s string, e error) {
- return msg.GetProperty(propagation.Header), nil
- })
- if span == nil || err != nil {
- return nil, ctx
- }
- return span, ctx
- }
- )
- func SetNewSkyWalkingConsumerEntrySpan(foo func(ctx context.Context, msg *primitive.MessageExt) (go2sky.Span, context.Context)) {
- newSkyWalkingConsumerEntrySpan = foo
- }
- func SetNewSkyWalkingProducerExitSpan(foo func(ctx context.Context, msg *primitive.Message) go2sky.Span) {
- newSkyWalkingProducerExitSpan = foo
- }
- func milliseconds2String(milliseconds int64) string {
- return time.Unix(milliseconds/1000, 0).Format(timeFormat)
- }
- func GetMessageTraceId(message *primitive.MessageExt) string {
- if message == nil {
- return ""
- }
- refSc := &propagation.SpanContext{}
- _ = refSc.DecodeSW8(message.GetProperty(propagation.Header))
- return refSc.TraceID
- }
- func getSkyWalkingConsumerTraceInterceptor() func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
- return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
- var (
- spans []go2sky.Span = nil
- )
- messages, isOk := req.([]*primitive.MessageExt)
- if isOk && newSkyWalkingConsumerEntrySpan != nil {
- spans = make([]go2sky.Span, 0, len(messages))
- for index, _ := range messages {
- span, _ := newRocketMQConsumerSpan(ctx, messages[index])
- if span != nil {
- spans = append(spans, span)
- }
- }
- }
- if spans != nil {
- defer func() {
- for _, elem := range spans {
- if elem != nil {
- elem.End()
- }
- }
- }()
- }
- return next(ctx, req, reply)
- }
- }
- func newRocketMQConsumerSpan(ctx context.Context, message *primitive.MessageExt) (go2sky.Span, context.Context) {
- span, ctx := newSkyWalkingConsumerEntrySpan(ctx, message)
- if span == nil {
- return nil, ctx
- }
- span.SetComponent(rocketMQConsumer)
- span.Tag("mq.topic", message.Topic)
- span.Tag("mq.msg_id", message.MsgId)
- span.Tag("consumer.language", language)
- span.Tag("producer.time", milliseconds2String(message.StoreTimestamp))
- span.Tag("consumer.time", time.Now().Format(timeFormat))
- span.Tag("consumer.hostname", util.CurrentServerHostName())
- return span, ctx
- }
- func getSkyWalkingProducerTraceInterceptor() func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
- return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
- var (
- span go2sky.Span
- )
- // 消息
- msg, isOk := req.(*primitive.Message)
- if isOk && newSkyWalkingProducerExitSpan != nil {
- span = newSkyWalkingProducerExitSpan(ctx, msg)
- }
- // end
- if span != nil {
- defer span.End()
- }
- // invoke
- if err := next(ctx, req, reply); err != nil {
- if span != nil {
- span.Error(time.Now(), fmt.Sprintf("%v", err))
- }
- return err
- }
- // handler result
- result, isOk := reply.(*primitive.SendResult)
- if isOk && span != nil {
- span.SetComponent(rocketMQProducer)
- span.Tag("mq.topic", msg.Topic)
- if tags := msg.GetTags(); tags != "" {
- span.Tag("mq.tags", tags)
- }
- span.Tag("mq.msg_id", result.MsgID)
- span.Tag("mq.broker.name", result.MessageQueue.BrokerName)
- span.Tag("mq.queue", util.ToString(result.MessageQueue.QueueId))
- span.Tag("mq.offset", util.ToString(result.QueueOffset))
- span.Tag("mq.status", util.ToString(result.Status))
- span.Tag("mq.trace", util.ToString(result.TraceOn))
- span.Tag("producer.language", language)
- span.Tag("producer.time", time.Now().Format(timeFormat))
- span.Tag("producer.hostname", util.CurrentServerHostName())
- }
- return nil
- }
- }
|