kafka_consumer.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
  8. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/application"
  9. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
  10. "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
  11. "github.com/Shopify/sarama"
  12. )
  13. var (
  14. _kafkaConsumer Consumer
  15. )
  16. func SetConsumerHandler(handler HandlerKafkaMessage) error {
  17. return _kafkaConsumer.SetConsumer(handler)
  18. }
  19. type kafkaConsumer struct {
  20. clientList []*kafkaClient
  21. isSetHandler bool
  22. sync.Mutex
  23. }
  24. func (this *kafkaConsumer) SetConsumer(handler HandlerKafkaMessage) error {
  25. this.Lock()
  26. defer this.Unlock()
  27. if this.isSetHandler {
  28. return errors.New("kafka consumer handler already set error")
  29. }
  30. if handler == nil {
  31. return errors.New("kafka consumer handler can not set nil ")
  32. }
  33. for _, elem := range this.clientList {
  34. select {
  35. case <-elem.done:
  36. logger.Errorc(elem.ctx, "[Kafka] consumer can nor set handler because the kafka is closed")
  37. default:
  38. elem.handler = handler
  39. elem.isSetHandler <- 1
  40. logger.Infoc(elem.ctx, "[Kafka] consumer set handler success")
  41. }
  42. }
  43. this.isSetHandler = true
  44. return nil
  45. }
  46. type Consumer interface {
  47. SetConsumer(handler HandlerKafkaMessage) error
  48. }
  49. type HandlerKafkaMessage func(ctx context.Context, msg *sarama.ConsumerMessage) error
  50. const (
  51. kafkaTopicKeyPrefix = "kafka_consumer_"
  52. )
  53. //kafkaConsumerConfig struct
  54. type kafkaConsumerTopicConfig struct {
  55. Topic string
  56. ProcessNum int
  57. }
  58. type kafkaConsumerConfig struct {
  59. kafkaClientConfig
  60. TopicConfigs []kafkaConsumerTopicConfig // topic 配置信息
  61. }
  62. /**
  63. [kafka_consumer_kpi_approve]
  64. topic = op-approve-event-dev
  65. process_num = 3
  66. */
  67. func getKafkaConsumerTopicConfig(key string, assertNil util.SetAndAssertNil) (*kafkaConsumerTopicConfig, error) {
  68. gekKafkaConsumerKey := func(key string) string {
  69. return fmt.Sprintf("%s%s", kafkaTopicKeyPrefix, key)
  70. }
  71. config := make(map[string]string, 0)
  72. key = gekKafkaConsumerKey(key)
  73. if err := assertNil(config, key, "topic"); err != nil {
  74. return nil, err
  75. }
  76. if err := assertNil(config, key, "process_num", "1"); err != nil {
  77. return nil, err
  78. }
  79. return &kafkaConsumerTopicConfig{
  80. Topic: config["topic"],
  81. ProcessNum: util.String2Int(config["process_num"]),
  82. }, nil
  83. }
  84. func getKafkaConsumerConfig(assertNil util.SetAndAssertNil) (*kafkaConsumerConfig, error) {
  85. config := make(map[string]string, 0)
  86. if err := assertNil(config, "kafka", "host"); err != nil {
  87. return nil, err
  88. }
  89. if err := assertNil(config, "kafka", "topic"); err != nil {
  90. return nil, err
  91. }
  92. if err := assertNil(config, "kafka", "log_path"); err != nil {
  93. return nil, err
  94. }
  95. if err := assertNil(config, "kafka", "group_name"); err != nil {
  96. return nil, err
  97. }
  98. host := util.SplitIgnoreSpace(config["host"], ",")
  99. if host == nil || len(host) == 0 {
  100. return nil, errors.New("kafka host can not empty")
  101. }
  102. topic := util.SplitIgnoreSpace(config["topic"], ",")
  103. if topic == nil || len(topic) == 0 {
  104. return nil, errors.New("kafka topics can not empty")
  105. }
  106. realTopicName := make([]string, 0, len(topic))
  107. topicConfigs := make([]kafkaConsumerTopicConfig, 0, len(topic))
  108. for _, elem := range topic {
  109. topicConfig, err := getKafkaConsumerTopicConfig(elem, assertNil)
  110. if err != nil {
  111. return nil, err
  112. }
  113. realTopicName = append(realTopicName, topicConfig.Topic)
  114. topicConfigs = append(topicConfigs, *topicConfig)
  115. }
  116. return &kafkaConsumerConfig{
  117. kafkaClientConfig: kafkaClientConfig{
  118. Host: host,
  119. GroupName: config["group_name"],
  120. LogPath: config["log_path"],
  121. Topic: realTopicName,
  122. },
  123. TopicConfigs: topicConfigs,
  124. }, nil
  125. }
  126. func InitConsumer() error {
  127. return startKafkaConsumer()
  128. }
  129. func startKafkaConsumer() error {
  130. config, err := getKafkaConsumerConfig(conf.SetAndAssertNil)
  131. if err != nil {
  132. return err
  133. }
  134. util.Debugf("[KafkaConsumer] load config success, config=%+v", config)
  135. consumer, err := initConsumer(config)
  136. if err != nil {
  137. return err
  138. }
  139. _kafkaConsumer = consumer
  140. return nil
  141. }
  142. func initConsumer(config *kafkaConsumerConfig) (Consumer, error) {
  143. kafkaClientList := make([]*kafkaClient, 0, len(config.TopicConfigs))
  144. for _, elem := range config.TopicConfigs {
  145. for x := 0; x < elem.ProcessNum; x++ {
  146. kafkaClient, err := newKafkaClient(consumerClient, config.kafkaClientConfig)
  147. if err != nil {
  148. return nil, err
  149. }
  150. kafkaClientList = append(kafkaClientList, kafkaClient)
  151. }
  152. }
  153. for _, kafkaClient := range kafkaClientList {
  154. kafkaClientConsumerStart(kafkaClient)
  155. }
  156. return &kafkaConsumer{
  157. clientList: kafkaClientList,
  158. }, nil
  159. }
  160. func kafkaClientConsumerStart(kafkaClient *kafkaClient) {
  161. go func() {
  162. defer func() {
  163. if err := recover(); err != nil {
  164. logger.Errorc(kafkaClient.ctx, "[Kafka] start kafka consumer find panic, topic: %s, err: %v", kafkaClient.consumerTopics, err)
  165. }
  166. }()
  167. err := kafkaClient.Consumer()
  168. if err != nil {
  169. logger.Errorc(kafkaClient.ctx, "[Kafka] start kafka consumer fail, topic: %s, err: %v", kafkaClient.consumerTopics, err)
  170. }
  171. }()
  172. //处理kafka消息
  173. go func() {
  174. defer func() {
  175. if err := recover(); err != nil {
  176. logger.Errorf("[Kafka] start kafka process find panic, topic: %s, err: %v", kafkaClient.consumerTopics, err)
  177. }
  178. }()
  179. loopConsumerProcess(kafkaClient)
  180. }()
  181. }
  182. func loopConsumerProcess(kafkaClient *kafkaClient) {
  183. for {
  184. select {
  185. case msg := <-kafkaClient.GetMessage():
  186. ctx := application.NewContext()
  187. logger.Infoc(ctx, "[Kafka] consumer message start, msg: %s, topic: %s", msg.Value, msg.Topic)
  188. err := kafkaClient.handler(ctx, msg)
  189. if err != nil {
  190. logger.Errorc(ctx, "[Kafka] consumer process err,err:%v", err)
  191. }
  192. kafkaClient.SetMessageIsHandle()
  193. logger.Infoc(ctx, "[Kafka] consumer end")
  194. //如果客户端关闭,则不需要消费
  195. case <-kafkaClient.done:
  196. logger.Infof("[Kafka] kafka consumer exited...")
  197. // 退出循环
  198. return
  199. }
  200. }
  201. }