package rocketmq import ( "context" "fmt" "time" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util" "git.shuncheng.lu/bigthing/gocommon/pkg/trace" "github.com/SkyAPM/go2sky" "github.com/SkyAPM/go2sky/propagation" "github.com/apache/rocketmq-client-go/v2/primitive" ) /** skywalking rocket-mq trace效果: 整体图:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/d1abf52d32e1476b9ca597d3bbc1d2a8.png 生产者图:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/dd1a106c07494b619d84c41b7aa5585a.png 消费者图:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/2378d0f6a726441097da2b578c9f607c.png */ const ( rocketMQProducer = 38 rocketMQConsumer = 39 ) const ( peerName = "No peer" language = "Go" timeFormat = "2006-01-02 15:04:05" ) // trace-config var ( newSkyWalkingProducerExitSpan = func(ctx context.Context, msg *primitive.Message) go2sky.Span { span, err := trace.GinNewExitSpan(ctx, "RocketMQ/Producer/"+msg.Topic, peerName, func(header string) error { msg.WithProperty(propagation.Header, header) return nil }) if span == nil || err != nil { return nil } return span } newSkyWalkingConsumerEntrySpan = func(ctx context.Context, msg *primitive.MessageExt) (go2sky.Span, context.Context) { span, ctx, err := trace.GinNewEntrySpan(ctx, "RocketMQ/Consumer/"+msg.Topic, func() (s string, e error) { return msg.GetProperty(propagation.Header), nil }) if span == nil || err != nil { return nil, ctx } return span, ctx } ) func SetNewSkyWalkingConsumerEntrySpan(foo func(ctx context.Context, msg *primitive.MessageExt) (go2sky.Span, context.Context)) { newSkyWalkingConsumerEntrySpan = foo } func SetNewSkyWalkingProducerExitSpan(foo func(ctx context.Context, msg *primitive.Message) go2sky.Span) { newSkyWalkingProducerExitSpan = foo } func milliseconds2String(milliseconds int64) string { return time.Unix(milliseconds/1000, 0).Format(timeFormat) } func GetMessageTraceId(message *primitive.MessageExt) string { if message == nil { return "" } refSc := &propagation.SpanContext{} _ = refSc.DecodeSW8(message.GetProperty(propagation.Header)) return refSc.TraceID } func getSkyWalkingConsumerTraceInterceptor() func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error { return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error { var ( spans []go2sky.Span = nil ) messages, isOk := req.([]*primitive.MessageExt) if isOk && newSkyWalkingConsumerEntrySpan != nil { spans = make([]go2sky.Span, 0, len(messages)) for index, _ := range messages { span, _ := newRocketMQConsumerSpan(ctx, messages[index]) if span != nil { spans = append(spans, span) } } } if spans != nil { defer func() { for _, elem := range spans { if elem != nil { elem.End() } } }() } return next(ctx, req, reply) } } func newRocketMQConsumerSpan(ctx context.Context, message *primitive.MessageExt) (go2sky.Span, context.Context) { span, ctx := newSkyWalkingConsumerEntrySpan(ctx, message) if span == nil { return nil, ctx } span.SetComponent(rocketMQConsumer) span.Tag("mq.topic", message.Topic) span.Tag("mq.msg_id", message.MsgId) span.Tag("consumer.language", language) span.Tag("producer.time", milliseconds2String(message.StoreTimestamp)) span.Tag("consumer.time", time.Now().Format(timeFormat)) span.Tag("consumer.hostname", util.CurrentServerHostName()) return span, ctx } func getSkyWalkingProducerTraceInterceptor() func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error { return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error { var ( span go2sky.Span ) // 消息 msg, isOk := req.(*primitive.Message) if isOk && newSkyWalkingProducerExitSpan != nil { span = newSkyWalkingProducerExitSpan(ctx, msg) } // end if span != nil { defer span.End() } // invoke if err := next(ctx, req, reply); err != nil { if span != nil { span.Error(time.Now(), fmt.Sprintf("%v", err)) } return err } // handler result result, isOk := reply.(*primitive.SendResult) if isOk && span != nil { span.SetComponent(rocketMQProducer) span.Tag("mq.topic", msg.Topic) if tags := msg.GetTags(); tags != "" { span.Tag("mq.tags", tags) } span.Tag("mq.msg_id", result.MsgID) span.Tag("mq.broker.name", result.MessageQueue.BrokerName) span.Tag("mq.queue", util.ToString(result.MessageQueue.QueueId)) span.Tag("mq.offset", util.ToString(result.QueueOffset)) span.Tag("mq.status", util.ToString(result.Status)) span.Tag("mq.trace", util.ToString(result.TraceOn)) span.Tag("producer.language", language) span.Tag("producer.time", time.Now().Format(timeFormat)) span.Tag("producer.hostname", util.CurrentServerHostName()) } return nil } }