config.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package rocketmq
  2. import (
  3. "context"
  4. "errors"
  5. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
  6. "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
  7. "github.com/apache/rocketmq-client-go/v2/consumer"
  8. "github.com/apache/rocketmq-client-go/v2/primitive"
  9. "github.com/apache/rocketmq-client-go/v2/producer"
  10. )
  11. // logger config
  12. var (
  13. infoc = logger.Infoc
  14. errorc = logger.Errorc
  15. loggerTraceIdKey = logger.TraceIdKey
  16. )
  17. func SetLoggerInfoc(foo func(ctx context.Context, format string, v ...interface{})) {
  18. infoc = foo
  19. }
  20. func SetLoggerErrorc(foo func(ctx context.Context, format string, v ...interface{})) {
  21. errorc = foo
  22. }
  23. func SetLoggerTraceIdKey(traceIdKey string) {
  24. loggerTraceIdKey = traceIdKey
  25. }
  26. var (
  27. consumerNotStartError = errors.New("rocket-mq consumer is not start")
  28. consumerStartedError = errors.New("rocket-mq consumer is started, con not operate")
  29. producerNotStartError = errors.New("rocket-mq producer is not start, con not operate")
  30. producerStartedError = errors.New("rocket-mq producer is started, con not operate")
  31. )
  32. const (
  33. configPrefix = "rocket_mq"
  34. )
  35. // 方便定制化配置,需要在启动前配置
  36. var (
  37. producerGlobalConfig = make([]producer.Option, 0)
  38. consumerGlobalConfig = make([]consumer.Option, 0)
  39. )
  40. func SetProducerConfig(config ...producer.Option) {
  41. producerGlobalConfig = append(producerGlobalConfig, config...)
  42. }
  43. func SetConsumerConfig(config ...consumer.Option) {
  44. consumerGlobalConfig = append(consumerGlobalConfig, config...)
  45. }
  46. // 方便定制化使用
  47. func GetRocketMQConfig(_type MQType, configFunc func(section, key string, defaultVal ...string) string) (*Config, error) {
  48. if configFunc == nil {
  49. return nil, errors.New("RocketMQ config con not be null")
  50. }
  51. config := func(key string, defaultValue ...string) string {
  52. return configFunc(configPrefix, key, defaultValue...)
  53. }
  54. // checked
  55. if config("host") == "" {
  56. return nil, errors.New(configPrefix + ".host can not be null")
  57. }
  58. if _type == ConsumerMQ {
  59. // checked
  60. if config("consumer.group_name") == "" {
  61. return nil, errors.New(configPrefix + ".consumer.group_name can not be null")
  62. }
  63. }
  64. if _type == ProducerMQ {
  65. // checked
  66. if config("producer.group_name") == "" {
  67. return nil, errors.New(configPrefix + ".producer.group_name can not be null")
  68. }
  69. }
  70. result := Config{
  71. MQType: _type,
  72. LogLevel: config("log.level", "info"),
  73. LogFileName: config("log.filename"),
  74. Host: util.SplitIgnoreSpace(config("host"), ","),
  75. ConsumerGroupName: config("consumer.group_name"),
  76. ProducerGroupName: config("producer.group_name"),
  77. TraceEnable: util.String2Bool(config("trace.enable", "true")),
  78. SkyWalkingEnable: util.String2Bool(config("skywalking.enable", "true")),
  79. }
  80. result.credentials = primitive.Credentials{
  81. AccessKey: config("access_key"),
  82. SecretKey: config("secret_key"),
  83. SecurityToken: config("security_token"),
  84. }
  85. result.traceConfig = primitive.TraceConfig{
  86. Credentials: result.credentials,
  87. Access: primitive.Local,
  88. NamesrvAddrs: result.Host,
  89. }
  90. return &result, nil
  91. }
  92. type MQType uint8
  93. const (
  94. ConsumerMQ MQType = iota + 1
  95. ProducerMQ
  96. )
  97. type Config struct {
  98. MQType MQType
  99. LogLevel string // 日志级别,debug|warn|error|info(default: info)
  100. LogFileName string // 日志位置,切割1day&最长保留时间30day
  101. Host []string // name-server地址
  102. ConsumerGroupName string // group-name
  103. ProducerGroupName string // group-name
  104. credentials primitive.Credentials // 密码/token相关
  105. SkyWalkingEnable bool // 是否开启skywalking-trace
  106. // 示例:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/7d3f0f33097743bd884e1c031cdafc76.png
  107. // 需要服务端开启trace-enable
  108. TraceEnable bool // 是否开启trace,默认是使用rocket-mq的trace的
  109. traceConfig primitive.TraceConfig // trace的配置
  110. }
  111. func (c Config) ToConsumerConfig() []consumer.Option {
  112. defaultOp := append([]consumer.Option{
  113. consumer.WithGroupName(c.ConsumerGroupName),
  114. consumer.WithNameServer(c.Host),
  115. consumer.WithCredentials(c.credentials),
  116. }, consumerGlobalConfig...)
  117. if c.TraceEnable {
  118. defaultOp = append(defaultOp, consumer.WithTrace(&c.traceConfig))
  119. }
  120. if c.SkyWalkingEnable {
  121. defaultOp = append(defaultOp, consumer.WithInterceptor(getSkyWalkingConsumerTraceInterceptor()))
  122. }
  123. return defaultOp
  124. }
  125. func (c Config) ToProducerConfig() []producer.Option {
  126. defaultOp := append([]producer.Option{
  127. producer.WithGroupName(c.ProducerGroupName),
  128. producer.WithNameServer(c.Host),
  129. producer.WithCredentials(c.credentials),
  130. }, producerGlobalConfig...)
  131. if c.TraceEnable {
  132. defaultOp = append(defaultOp, producer.WithTrace(&c.traceConfig))
  133. }
  134. if c.SkyWalkingEnable {
  135. defaultOp = append(defaultOp, producer.WithInterceptor(getSkyWalkingProducerTraceInterceptor()))
  136. }
  137. return defaultOp
  138. }