kafka_client.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "log"
  6. "os"
  7. "sync"
  8. "time"
  9. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/application"
  10. "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
  11. "github.com/Shopify/sarama"
  12. )
  13. type clientType uint8
  14. const (
  15. producerClient clientType = iota + 1
  16. consumerClient
  17. )
  18. var (
  19. producerIsDoneError = errors.New("the kafka client is done, can not producer message")
  20. )
  21. type kafkaClientConfig struct {
  22. Host []string
  23. GroupName string
  24. LogPath string
  25. Topic []string
  26. }
  27. var (
  28. // 日志初始化
  29. saramaLogger sarama.StdLogger
  30. saramaLoggerInitLock sync.Mutex
  31. )
  32. type kafkaClient struct {
  33. // 客户端类型
  34. clientType clientType
  35. // 上下文信息
  36. ctx context.Context
  37. // 消费者
  38. //接收消息在业务逻辑里处理
  39. msg chan *sarama.ConsumerMessage
  40. // 标记消息处理完成
  41. msgIsHandle chan int
  42. // 处理消息的handler,如果没有设置则会等待到设置后才去消费!
  43. handler HandlerKafkaMessage
  44. // 实现等待的chan
  45. isSetHandler chan int
  46. // 是否准备好,当开始消费的时候标记它为开始
  47. ready chan bool
  48. // 消费是否已经关闭
  49. done chan bool
  50. //消费者topic信息
  51. consumerTopics []string
  52. //group id
  53. groupName string
  54. //producer
  55. producer sarama.SyncProducer
  56. //brokers
  57. brokers []string
  58. // 日志文件路径
  59. logDirPath string
  60. // 生产者/消费者的配置属性
  61. config *sarama.Config
  62. }
  63. //KafkaClient构造函数
  64. func newKafkaClient(clientType clientType, kafkaClientConfig kafkaClientConfig) (*kafkaClient, error) {
  65. // 初始化日志,只需要加载一次
  66. err := func() error {
  67. saramaLoggerInitLock.Lock()
  68. defer saramaLoggerInitLock.Unlock()
  69. if saramaLogger == nil {
  70. file, err := os.OpenFile(kafkaClientConfig.LogPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
  71. if err != nil {
  72. return err
  73. }
  74. saramaLogger = log.New(file, "[Sarama] ", log.LstdFlags)
  75. sarama.Logger = saramaLogger
  76. }
  77. return nil
  78. }()
  79. if err != nil {
  80. return nil, err
  81. }
  82. client := kafkaClient{
  83. clientType: clientType,
  84. msg: make(chan *sarama.ConsumerMessage),
  85. done: make(chan bool),
  86. brokers: kafkaClientConfig.Host,
  87. groupName: kafkaClientConfig.GroupName,
  88. logDirPath: kafkaClientConfig.LogPath,
  89. ctx: application.NewContext(),
  90. isSetHandler: make(chan int),
  91. ready: make(chan bool),
  92. msgIsHandle: make(chan int),
  93. }
  94. configKafka := sarama.NewConfig()
  95. // 配置文件
  96. configKafka.Version = sarama.V0_11_0_1 //线上运行版本
  97. configKafka.ClientID = kafkaClientConfig.GroupName
  98. configKafka.Metadata.Full = false
  99. // 有些情况下是不需要初始化 生产者的,比如消费者,所以需要做判断
  100. switch clientType {
  101. case producerClient:
  102. configKafka.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
  103. configKafka.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
  104. configKafka.Producer.Return.Successes = true
  105. configKafka.Producer.Compression = sarama.CompressionSnappy
  106. configKafka.Producer.Flush.Frequency = 500 * time.Millisecond
  107. kafkaProducer, err := sarama.NewSyncProducer(kafkaClientConfig.Host, configKafka)
  108. if err != nil {
  109. return nil, err
  110. }
  111. client.producer = kafkaProducer
  112. case consumerClient:
  113. configKafka.Consumer.Offsets.Initial = sarama.OffsetNewest
  114. configKafka.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
  115. client.consumerTopics = kafkaClientConfig.Topic
  116. default:
  117. return nil, errors.New("kafka not support other type")
  118. }
  119. client.config = configKafka
  120. logger.Infoc(client.ctx, "[Kafka] new kafka client success, info=%+v", client)
  121. return &client, nil
  122. }
  123. // 不需要携带key
  124. func (this *kafkaClient) ProducerMsg(ctx context.Context, topic, message string) error {
  125. return this.Producer(ctx, topic, message, "")
  126. }
  127. //消息message string
  128. func (this *kafkaClient) Producer(ctx context.Context, topic, message string, key string) error {
  129. select {
  130. case <-this.done: // 如果关闭,抛出异常
  131. logger.Errorc(ctx, "[Kafka] produce message err, topic: %s, message: %s, err: %t", topic, message, producerIsDoneError)
  132. return producerIsDoneError
  133. default:
  134. // 默认则可以发送消息
  135. msg := sarama.ProducerMessage{
  136. Topic: topic,
  137. Timestamp: time.Now(),
  138. }
  139. msg.Value = sarama.StringEncoder(message)
  140. // 设置key,会使得Partition不均匀,推荐不设置
  141. if key != "" {
  142. msg.Key = sarama.StringEncoder(key)
  143. }
  144. partition, offset, err := this.producer.SendMessage(&msg)
  145. if err != nil {
  146. logger.Errorc(ctx, "[Kafka] produce message err, topic: %s, message: %s, partition: %d, offset: %d, key:%s, err: %v", topic, message, partition, offset, key, err)
  147. return err
  148. }
  149. logger.Infoc(ctx, "[Kafka] produce message success, topic: %s, message: %s, partition: %d, offset: %d, key:%s", topic, message, partition, offset, key)
  150. return nil
  151. }
  152. }
  153. //kafka消费者
  154. func (this *kafkaClient) Consumer() error {
  155. client, err := sarama.NewConsumerGroup(this.brokers, this.groupName, this.config)
  156. if err != nil {
  157. return err
  158. }
  159. logger.Infoc(this.ctx, "[Kafka] new consumer group success")
  160. wg := &sync.WaitGroup{}
  161. wg.Add(1)
  162. ctx, cancel := context.WithCancel(this.ctx)
  163. go func() {
  164. // 等待设置handler
  165. <-this.isSetHandler
  166. // 所有的G 都需要recover,防止全局Panic
  167. defer func() {
  168. wg.Done()
  169. if err := recover(); err != nil {
  170. logger.Errorc(ctx, "[Kafka] consume panic, err:%v", err)
  171. }
  172. }()
  173. for {
  174. logger.Infoc(ctx, "[Kafka] start consume")
  175. /**
  176. 这里需要注意 ctx 如果发生了一次 cancel 会无脑的循环下去。因为调用cancel后,channel关闭,会导致每次都会收到Done()
  177. 所以这里需要抓取 ctx.Err() 的异常跳出循环!
  178. */
  179. if err := client.Consume(ctx, this.consumerTopics, this); err != nil { //发生rb 不会异常
  180. logger.Errorc(ctx, "[kafka consumer] err from consumer:%s", err.Error())
  181. return
  182. }
  183. if ctx.Err() != nil {
  184. logger.Errorc(ctx, "[Kafka consumer] client.Consume err:%v", ctx.Err())
  185. return
  186. }
  187. // 每次都初始化,防止 channel 关闭后继续写入出现panic,因为setUp中设置了
  188. this.ready = make(chan bool)
  189. }
  190. }()
  191. <-this.ready // Await till the consumer has been set up
  192. logger.Infoc(this.ctx, "[Kafka] consumer up and running !")
  193. // 这里去监听done,其实就是如果手动调用了Close,会触发这个
  194. select {
  195. // 监听这个是因为,如果ctx是外层传入的就可以取消
  196. case <-ctx.Done():
  197. logger.Infoc(this.ctx, "[Kafka] consumer terminating: context cancelled")
  198. // 调用Close方法会触发这个
  199. case <-this.done:
  200. logger.Infoc(this.ctx, "[Kafka] consumer terminating: via signal")
  201. }
  202. // todo 问题:1、如果client.Close()则退出消费,不需要调用cancel()去通知消费
  203. // 取消Ctx,取消G的消费
  204. cancel()
  205. // 等待接触,如果G退出
  206. wg.Wait()
  207. if err = client.Close(); err != nil {
  208. return err
  209. }
  210. return nil
  211. }
  212. func (this *kafkaClient) Close() error {
  213. // 不能随便关闭写的channel,如果关闭后还有写入会引发程序panic,需要控制写入(推荐不写入,只close通知)
  214. defer func() {
  215. close(this.done) // 标记关闭
  216. //close(this.msg) // 关闭消息chan
  217. //close(this.msgIsHandle) // 不能关闭,原因:因为消费过程中可能存在程序处理过程中,kafka close掉,此时处理完成往里面写数据,会panic
  218. close(this.isSetHandler) // 关闭,防止bug,导致消费的G一直阻塞
  219. }()
  220. if this.producer != nil {
  221. if err := this.producer.Close(); err != nil {
  222. return err
  223. }
  224. }
  225. return nil
  226. }
  227. //获取kafka消息
  228. func (this *kafkaClient) GetMessage() <-chan *sarama.ConsumerMessage {
  229. return this.msg
  230. }
  231. // 添加消息
  232. func (this *kafkaClient) AddMessage(message *sarama.ConsumerMessage) {
  233. this.msg <- message
  234. }
  235. // 通知消息处理完成
  236. func (this *kafkaClient) SetMessageIsHandle() {
  237. this.msgIsHandle <- 0
  238. }
  239. // 等待业务逻辑处理完成
  240. func (this *kafkaClient) WaitMessageIsHandle() <-chan int {
  241. return this.msgIsHandle
  242. }
  243. // Setup is run at the beginning of a new session, before ConsumeClaim
  244. func (this *kafkaClient) Setup(ss sarama.ConsumerGroupSession) error {
  245. logger.Infoc(this.ctx, "[Kafka] Consumer Setup ss:%+v", ss)
  246. // 关闭channel(注意不能重复关闭一个已经关闭的channel,所以需要重复实例化),通知已启动
  247. close(this.ready)
  248. return nil
  249. }
  250. // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
  251. func (this *kafkaClient) Cleanup(ss sarama.ConsumerGroupSession) error {
  252. logger.Infoc(this.ctx, "[Kafka] Consumer Cleanup ss:%+v", ss)
  253. return nil
  254. }
  255. // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
  256. func (this *kafkaClient) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  257. // NOTE:
  258. // Do not move the code below to a goroutine.
  259. // The `ConsumeClaim` itself is called within a goroutine, see:
  260. // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
  261. for message := range claim.Messages() {
  262. this.AddMessage(message)
  263. <-this.WaitMessageIsHandle()
  264. session.MarkMessage(message, "") // 必须标记不然不会提交偏移量
  265. }
  266. return nil
  267. }