package rocketmq import ( "context" "errors" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util" "git.shuncheng.lu/bigthing/gocommon/pkg/logger" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) // logger config var ( infoc = logger.Infoc errorc = logger.Errorc loggerTraceIdKey = logger.TraceIdKey ) func SetLoggerInfoc(foo func(ctx context.Context, format string, v ...interface{})) { infoc = foo } func SetLoggerErrorc(foo func(ctx context.Context, format string, v ...interface{})) { errorc = foo } func SetLoggerTraceIdKey(traceIdKey string) { loggerTraceIdKey = traceIdKey } var ( consumerNotStartError = errors.New("rocket-mq consumer is not start") consumerStartedError = errors.New("rocket-mq consumer is started, con not operate") producerNotStartError = errors.New("rocket-mq producer is not start, con not operate") producerStartedError = errors.New("rocket-mq producer is started, con not operate") ) const ( configPrefix = "rocket_mq" ) // 方便定制化配置,需要在启动前配置 var ( producerGlobalConfig = make([]producer.Option, 0) consumerGlobalConfig = make([]consumer.Option, 0) ) func SetProducerConfig(config ...producer.Option) { producerGlobalConfig = append(producerGlobalConfig, config...) } func SetConsumerConfig(config ...consumer.Option) { consumerGlobalConfig = append(consumerGlobalConfig, config...) } // 方便定制化使用 func GetRocketMQConfig(_type MQType, configFunc func(section, key string, defaultVal ...string) string) (*Config, error) { if configFunc == nil { return nil, errors.New("RocketMQ config con not be null") } config := func(key string, defaultValue ...string) string { return configFunc(configPrefix, key, defaultValue...) } // checked if config("host") == "" { return nil, errors.New(configPrefix + ".host can not be null") } if _type == ConsumerMQ { // checked if config("consumer.group_name") == "" { return nil, errors.New(configPrefix + ".consumer.group_name can not be null") } } if _type == ProducerMQ { // checked if config("producer.group_name") == "" { return nil, errors.New(configPrefix + ".producer.group_name can not be null") } } result := Config{ MQType: _type, LogLevel: config("log.level", "info"), LogFileName: config("log.filename"), Host: util.SplitIgnoreSpace(config("host"), ","), ConsumerGroupName: config("consumer.group_name"), ProducerGroupName: config("producer.group_name"), TraceEnable: util.String2Bool(config("trace.enable", "true")), SkyWalkingEnable: util.String2Bool(config("skywalking.enable", "true")), } result.credentials = primitive.Credentials{ AccessKey: config("access_key"), SecretKey: config("secret_key"), SecurityToken: config("security_token"), } result.traceConfig = primitive.TraceConfig{ Credentials: result.credentials, Access: primitive.Local, NamesrvAddrs: result.Host, } return &result, nil } type MQType uint8 const ( ConsumerMQ MQType = iota + 1 ProducerMQ ) type Config struct { MQType MQType LogLevel string // 日志级别,debug|warn|error|info(default: info) LogFileName string // 日志位置,切割1day&最长保留时间30day Host []string // name-server地址 ConsumerGroupName string // group-name ProducerGroupName string // group-name credentials primitive.Credentials // 密码/token相关 SkyWalkingEnable bool // 是否开启skywalking-trace // 示例:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/7d3f0f33097743bd884e1c031cdafc76.png // 需要服务端开启trace-enable TraceEnable bool // 是否开启trace,默认是使用rocket-mq的trace的 traceConfig primitive.TraceConfig // trace的配置 } func (c Config) ToConsumerConfig() []consumer.Option { defaultOp := append([]consumer.Option{ consumer.WithGroupName(c.ConsumerGroupName), consumer.WithNameServer(c.Host), consumer.WithCredentials(c.credentials), }, consumerGlobalConfig...) if c.TraceEnable { defaultOp = append(defaultOp, consumer.WithTrace(&c.traceConfig)) } if c.SkyWalkingEnable { defaultOp = append(defaultOp, consumer.WithInterceptor(getSkyWalkingConsumerTraceInterceptor())) } return defaultOp } func (c Config) ToProducerConfig() []producer.Option { defaultOp := append([]producer.Option{ producer.WithGroupName(c.ProducerGroupName), producer.WithNameServer(c.Host), producer.WithCredentials(c.credentials), }, producerGlobalConfig...) if c.TraceEnable { defaultOp = append(defaultOp, producer.WithTrace(&c.traceConfig)) } if c.SkyWalkingEnable { defaultOp = append(defaultOp, producer.WithInterceptor(getSkyWalkingProducerTraceInterceptor())) } return defaultOp }