| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- package rocketmq
- import (
- "context"
- "errors"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
- "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
- "github.com/apache/rocketmq-client-go/v2/consumer"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- "github.com/apache/rocketmq-client-go/v2/producer"
- )
- // logger config
- var (
- infoc = logger.Infoc
- errorc = logger.Errorc
- loggerTraceIdKey = logger.TraceIdKey
- )
- func SetLoggerInfoc(foo func(ctx context.Context, format string, v ...interface{})) {
- infoc = foo
- }
- func SetLoggerErrorc(foo func(ctx context.Context, format string, v ...interface{})) {
- errorc = foo
- }
- func SetLoggerTraceIdKey(traceIdKey string) {
- loggerTraceIdKey = traceIdKey
- }
- var (
- consumerNotStartError = errors.New("rocket-mq consumer is not start")
- consumerStartedError = errors.New("rocket-mq consumer is started, con not operate")
- producerNotStartError = errors.New("rocket-mq producer is not start, con not operate")
- producerStartedError = errors.New("rocket-mq producer is started, con not operate")
- )
- const (
- configPrefix = "rocket_mq"
- )
- // 方便定制化配置,需要在启动前配置
- var (
- producerGlobalConfig = make([]producer.Option, 0)
- consumerGlobalConfig = make([]consumer.Option, 0)
- )
- func SetProducerConfig(config ...producer.Option) {
- producerGlobalConfig = append(producerGlobalConfig, config...)
- }
- func SetConsumerConfig(config ...consumer.Option) {
- consumerGlobalConfig = append(consumerGlobalConfig, config...)
- }
- // 方便定制化使用
- func GetRocketMQConfig(_type MQType, configFunc func(section, key string, defaultVal ...string) string) (*Config, error) {
- if configFunc == nil {
- return nil, errors.New("RocketMQ config con not be null")
- }
- config := func(key string, defaultValue ...string) string {
- return configFunc(configPrefix, key, defaultValue...)
- }
- // checked
- if config("host") == "" {
- return nil, errors.New(configPrefix + ".host can not be null")
- }
- if _type == ConsumerMQ {
- // checked
- if config("consumer.group_name") == "" {
- return nil, errors.New(configPrefix + ".consumer.group_name can not be null")
- }
- }
- if _type == ProducerMQ {
- // checked
- if config("producer.group_name") == "" {
- return nil, errors.New(configPrefix + ".producer.group_name can not be null")
- }
- }
- result := Config{
- MQType: _type,
- LogLevel: config("log.level", "info"),
- LogFileName: config("log.filename"),
- Host: util.SplitIgnoreSpace(config("host"), ","),
- ConsumerGroupName: config("consumer.group_name"),
- ProducerGroupName: config("producer.group_name"),
- TraceEnable: util.String2Bool(config("trace.enable", "true")),
- SkyWalkingEnable: util.String2Bool(config("skywalking.enable", "true")),
- }
- result.credentials = primitive.Credentials{
- AccessKey: config("access_key"),
- SecretKey: config("secret_key"),
- SecurityToken: config("security_token"),
- }
- result.traceConfig = primitive.TraceConfig{
- Credentials: result.credentials,
- Access: primitive.Local,
- NamesrvAddrs: result.Host,
- }
- return &result, nil
- }
- type MQType uint8
- const (
- ConsumerMQ MQType = iota + 1
- ProducerMQ
- )
- type Config struct {
- MQType MQType
- LogLevel string // 日志级别,debug|warn|error|info(default: info)
- LogFileName string // 日志位置,切割1day&最长保留时间30day
- Host []string // name-server地址
- ConsumerGroupName string // group-name
- ProducerGroupName string // group-name
- credentials primitive.Credentials // 密码/token相关
- SkyWalkingEnable bool // 是否开启skywalking-trace
- // 示例:https://tyut.oss-accelerate.aliyuncs.com/image/2021/3-11/7d3f0f33097743bd884e1c031cdafc76.png
- // 需要服务端开启trace-enable
- TraceEnable bool // 是否开启trace,默认是使用rocket-mq的trace的
- traceConfig primitive.TraceConfig // trace的配置
- }
- func (c Config) ToConsumerConfig() []consumer.Option {
- defaultOp := append([]consumer.Option{
- consumer.WithGroupName(c.ConsumerGroupName),
- consumer.WithNameServer(c.Host),
- consumer.WithCredentials(c.credentials),
- }, consumerGlobalConfig...)
- if c.TraceEnable {
- defaultOp = append(defaultOp, consumer.WithTrace(&c.traceConfig))
- }
- if c.SkyWalkingEnable {
- defaultOp = append(defaultOp, consumer.WithInterceptor(getSkyWalkingConsumerTraceInterceptor()))
- }
- return defaultOp
- }
- func (c Config) ToProducerConfig() []producer.Option {
- defaultOp := append([]producer.Option{
- producer.WithGroupName(c.ProducerGroupName),
- producer.WithNameServer(c.Host),
- producer.WithCredentials(c.credentials),
- }, producerGlobalConfig...)
- if c.TraceEnable {
- defaultOp = append(defaultOp, producer.WithTrace(&c.traceConfig))
- }
- if c.SkyWalkingEnable {
- defaultOp = append(defaultOp, producer.WithInterceptor(getSkyWalkingProducerTraceInterceptor()))
- }
- return defaultOp
- }
|