package kafka import ( "context" "errors" "fmt" "sync" "git.shuncheng.lu/bigthing/gocommon/pkg/conf" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/application" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util" "git.shuncheng.lu/bigthing/gocommon/pkg/logger" "github.com/Shopify/sarama" ) var ( _kafkaConsumer Consumer ) func SetConsumerHandler(handler HandlerKafkaMessage) error { return _kafkaConsumer.SetConsumer(handler) } type kafkaConsumer struct { clientList []*kafkaClient isSetHandler bool sync.Mutex } func (this *kafkaConsumer) SetConsumer(handler HandlerKafkaMessage) error { this.Lock() defer this.Unlock() if this.isSetHandler { return errors.New("kafka consumer handler already set error") } if handler == nil { return errors.New("kafka consumer handler can not set nil ") } for _, elem := range this.clientList { select { case <-elem.done: logger.Errorc(elem.ctx, "[Kafka] consumer can nor set handler because the kafka is closed") default: elem.handler = handler elem.isSetHandler <- 1 logger.Infoc(elem.ctx, "[Kafka] consumer set handler success") } } this.isSetHandler = true return nil } type Consumer interface { SetConsumer(handler HandlerKafkaMessage) error } type HandlerKafkaMessage func(ctx context.Context, msg *sarama.ConsumerMessage) error const ( kafkaTopicKeyPrefix = "kafka_consumer_" ) //kafkaConsumerConfig struct type kafkaConsumerTopicConfig struct { Topic string ProcessNum int } type kafkaConsumerConfig struct { kafkaClientConfig TopicConfigs []kafkaConsumerTopicConfig // topic 配置信息 } /** [kafka_consumer_kpi_approve] topic = op-approve-event-dev process_num = 3 */ func getKafkaConsumerTopicConfig(key string, assertNil util.SetAndAssertNil) (*kafkaConsumerTopicConfig, error) { gekKafkaConsumerKey := func(key string) string { return fmt.Sprintf("%s%s", kafkaTopicKeyPrefix, key) } config := make(map[string]string, 0) key = gekKafkaConsumerKey(key) if err := assertNil(config, key, "topic"); err != nil { return nil, err } if err := assertNil(config, key, "process_num", "1"); err != nil { return nil, err } return &kafkaConsumerTopicConfig{ Topic: config["topic"], ProcessNum: util.String2Int(config["process_num"]), }, nil } func getKafkaConsumerConfig(assertNil util.SetAndAssertNil) (*kafkaConsumerConfig, error) { config := make(map[string]string, 0) if err := assertNil(config, "kafka", "host"); err != nil { return nil, err } if err := assertNil(config, "kafka", "topic"); err != nil { return nil, err } if err := assertNil(config, "kafka", "log_path"); err != nil { return nil, err } if err := assertNil(config, "kafka", "group_name"); err != nil { return nil, err } host := util.SplitIgnoreSpace(config["host"], ",") if host == nil || len(host) == 0 { return nil, errors.New("kafka host can not empty") } topic := util.SplitIgnoreSpace(config["topic"], ",") if topic == nil || len(topic) == 0 { return nil, errors.New("kafka topics can not empty") } realTopicName := make([]string, 0, len(topic)) topicConfigs := make([]kafkaConsumerTopicConfig, 0, len(topic)) for _, elem := range topic { topicConfig, err := getKafkaConsumerTopicConfig(elem, assertNil) if err != nil { return nil, err } realTopicName = append(realTopicName, topicConfig.Topic) topicConfigs = append(topicConfigs, *topicConfig) } return &kafkaConsumerConfig{ kafkaClientConfig: kafkaClientConfig{ Host: host, GroupName: config["group_name"], LogPath: config["log_path"], Topic: realTopicName, }, TopicConfigs: topicConfigs, }, nil } func InitConsumer() error { return startKafkaConsumer() } func startKafkaConsumer() error { config, err := getKafkaConsumerConfig(conf.SetAndAssertNil) if err != nil { return err } util.Debugf("[KafkaConsumer] load config success, config=%+v", config) consumer, err := initConsumer(config) if err != nil { return err } _kafkaConsumer = consumer return nil } func initConsumer(config *kafkaConsumerConfig) (Consumer, error) { kafkaClientList := make([]*kafkaClient, 0, len(config.TopicConfigs)) for _, elem := range config.TopicConfigs { for x := 0; x < elem.ProcessNum; x++ { kafkaClient, err := newKafkaClient(consumerClient, config.kafkaClientConfig) if err != nil { return nil, err } kafkaClientList = append(kafkaClientList, kafkaClient) } } for _, kafkaClient := range kafkaClientList { kafkaClientConsumerStart(kafkaClient) } return &kafkaConsumer{ clientList: kafkaClientList, }, nil } func kafkaClientConsumerStart(kafkaClient *kafkaClient) { go func() { defer func() { if err := recover(); err != nil { logger.Errorc(kafkaClient.ctx, "[Kafka] start kafka consumer find panic, topic: %s, err: %v", kafkaClient.consumerTopics, err) } }() err := kafkaClient.Consumer() if err != nil { logger.Errorc(kafkaClient.ctx, "[Kafka] start kafka consumer fail, topic: %s, err: %v", kafkaClient.consumerTopics, err) } }() //处理kafka消息 go func() { defer func() { if err := recover(); err != nil { logger.Errorf("[Kafka] start kafka process find panic, topic: %s, err: %v", kafkaClient.consumerTopics, err) } }() loopConsumerProcess(kafkaClient) }() } func loopConsumerProcess(kafkaClient *kafkaClient) { for { select { case msg := <-kafkaClient.GetMessage(): ctx := application.NewContext() logger.Infoc(ctx, "[Kafka] consumer message start, msg: %s, topic: %s", msg.Value, msg.Topic) err := kafkaClient.handler(ctx, msg) if err != nil { logger.Errorc(ctx, "[Kafka] consumer process err,err:%v", err) } kafkaClient.SetMessageIsHandle() logger.Infoc(ctx, "[Kafka] consumer end") //如果客户端关闭,则不需要消费 case <-kafkaClient.done: logger.Infof("[Kafka] kafka consumer exited...") // 退出循环 return } } }