| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- package kafka
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/application"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
- "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
- "github.com/Shopify/sarama"
- )
- var (
- _kafkaConsumer Consumer
- )
- func SetConsumerHandler(handler HandlerKafkaMessage) error {
- return _kafkaConsumer.SetConsumer(handler)
- }
- type kafkaConsumer struct {
- clientList []*kafkaClient
- isSetHandler bool
- sync.Mutex
- }
- func (this *kafkaConsumer) SetConsumer(handler HandlerKafkaMessage) error {
- this.Lock()
- defer this.Unlock()
- if this.isSetHandler {
- return errors.New("kafka consumer handler already set error")
- }
- if handler == nil {
- return errors.New("kafka consumer handler can not set nil ")
- }
- for _, elem := range this.clientList {
- select {
- case <-elem.done:
- logger.Errorc(elem.ctx, "[Kafka] consumer can nor set handler because the kafka is closed")
- default:
- elem.handler = handler
- elem.isSetHandler <- 1
- logger.Infoc(elem.ctx, "[Kafka] consumer set handler success")
- }
- }
- this.isSetHandler = true
- return nil
- }
- type Consumer interface {
- SetConsumer(handler HandlerKafkaMessage) error
- }
- type HandlerKafkaMessage func(ctx context.Context, msg *sarama.ConsumerMessage) error
- const (
- kafkaTopicKeyPrefix = "kafka_consumer_"
- )
- //kafkaConsumerConfig struct
- type kafkaConsumerTopicConfig struct {
- Topic string
- ProcessNum int
- }
- type kafkaConsumerConfig struct {
- kafkaClientConfig
- TopicConfigs []kafkaConsumerTopicConfig // topic 配置信息
- }
- /**
- [kafka_consumer_kpi_approve]
- topic = op-approve-event-dev
- process_num = 3
- */
- func getKafkaConsumerTopicConfig(key string, assertNil util.SetAndAssertNil) (*kafkaConsumerTopicConfig, error) {
- gekKafkaConsumerKey := func(key string) string {
- return fmt.Sprintf("%s%s", kafkaTopicKeyPrefix, key)
- }
- config := make(map[string]string, 0)
- key = gekKafkaConsumerKey(key)
- if err := assertNil(config, key, "topic"); err != nil {
- return nil, err
- }
- if err := assertNil(config, key, "process_num", "1"); err != nil {
- return nil, err
- }
- return &kafkaConsumerTopicConfig{
- Topic: config["topic"],
- ProcessNum: util.String2Int(config["process_num"]),
- }, nil
- }
- func getKafkaConsumerConfig(assertNil util.SetAndAssertNil) (*kafkaConsumerConfig, error) {
- config := make(map[string]string, 0)
- if err := assertNil(config, "kafka", "host"); err != nil {
- return nil, err
- }
- if err := assertNil(config, "kafka", "topic"); err != nil {
- return nil, err
- }
- if err := assertNil(config, "kafka", "log_path"); err != nil {
- return nil, err
- }
- if err := assertNil(config, "kafka", "group_name"); err != nil {
- return nil, err
- }
- host := util.SplitIgnoreSpace(config["host"], ",")
- if host == nil || len(host) == 0 {
- return nil, errors.New("kafka host can not empty")
- }
- topic := util.SplitIgnoreSpace(config["topic"], ",")
- if topic == nil || len(topic) == 0 {
- return nil, errors.New("kafka topics can not empty")
- }
- realTopicName := make([]string, 0, len(topic))
- topicConfigs := make([]kafkaConsumerTopicConfig, 0, len(topic))
- for _, elem := range topic {
- topicConfig, err := getKafkaConsumerTopicConfig(elem, assertNil)
- if err != nil {
- return nil, err
- }
- realTopicName = append(realTopicName, topicConfig.Topic)
- topicConfigs = append(topicConfigs, *topicConfig)
- }
- return &kafkaConsumerConfig{
- kafkaClientConfig: kafkaClientConfig{
- Host: host,
- GroupName: config["group_name"],
- LogPath: config["log_path"],
- Topic: realTopicName,
- },
- TopicConfigs: topicConfigs,
- }, nil
- }
- func InitConsumer() error {
- return startKafkaConsumer()
- }
- func startKafkaConsumer() error {
- config, err := getKafkaConsumerConfig(conf.SetAndAssertNil)
- if err != nil {
- return err
- }
- util.Debugf("[KafkaConsumer] load config success, config=%+v", config)
- consumer, err := initConsumer(config)
- if err != nil {
- return err
- }
- _kafkaConsumer = consumer
- return nil
- }
- func initConsumer(config *kafkaConsumerConfig) (Consumer, error) {
- kafkaClientList := make([]*kafkaClient, 0, len(config.TopicConfigs))
- for _, elem := range config.TopicConfigs {
- for x := 0; x < elem.ProcessNum; x++ {
- kafkaClient, err := newKafkaClient(consumerClient, config.kafkaClientConfig)
- if err != nil {
- return nil, err
- }
- kafkaClientList = append(kafkaClientList, kafkaClient)
- }
- }
- for _, kafkaClient := range kafkaClientList {
- kafkaClientConsumerStart(kafkaClient)
- }
- return &kafkaConsumer{
- clientList: kafkaClientList,
- }, nil
- }
- func kafkaClientConsumerStart(kafkaClient *kafkaClient) {
- go func() {
- defer func() {
- if err := recover(); err != nil {
- logger.Errorc(kafkaClient.ctx, "[Kafka] start kafka consumer find panic, topic: %s, err: %v", kafkaClient.consumerTopics, err)
- }
- }()
- err := kafkaClient.Consumer()
- if err != nil {
- logger.Errorc(kafkaClient.ctx, "[Kafka] start kafka consumer fail, topic: %s, err: %v", kafkaClient.consumerTopics, err)
- }
- }()
- //处理kafka消息
- go func() {
- defer func() {
- if err := recover(); err != nil {
- logger.Errorf("[Kafka] start kafka process find panic, topic: %s, err: %v", kafkaClient.consumerTopics, err)
- }
- }()
- loopConsumerProcess(kafkaClient)
- }()
- }
- func loopConsumerProcess(kafkaClient *kafkaClient) {
- for {
- select {
- case msg := <-kafkaClient.GetMessage():
- ctx := application.NewContext()
- logger.Infoc(ctx, "[Kafka] consumer message start, msg: %s, topic: %s", msg.Value, msg.Topic)
- err := kafkaClient.handler(ctx, msg)
- if err != nil {
- logger.Errorc(ctx, "[Kafka] consumer process err,err:%v", err)
- }
- kafkaClient.SetMessageIsHandle()
- logger.Infoc(ctx, "[Kafka] consumer end")
- //如果客户端关闭,则不需要消费
- case <-kafkaClient.done:
- logger.Infof("[Kafka] kafka consumer exited...")
- // 退出循环
- return
- }
- }
- }
|