| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- package kafka
- import (
- "context"
- "errors"
- "log"
- "os"
- "sync"
- "time"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/application"
- "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
- "github.com/Shopify/sarama"
- )
- type clientType uint8
- const (
- producerClient clientType = iota + 1
- consumerClient
- )
- var (
- producerIsDoneError = errors.New("the kafka client is done, can not producer message")
- )
- type kafkaClientConfig struct {
- Host []string
- GroupName string
- LogPath string
- Topic []string
- }
- var (
- // 日志初始化
- saramaLogger sarama.StdLogger
- saramaLoggerInitLock sync.Mutex
- )
- type kafkaClient struct {
- // 客户端类型
- clientType clientType
- // 上下文信息
- ctx context.Context
- // 消费者
- //接收消息在业务逻辑里处理
- msg chan *sarama.ConsumerMessage
- // 标记消息处理完成
- msgIsHandle chan int
- // 处理消息的handler,如果没有设置则会等待到设置后才去消费!
- handler HandlerKafkaMessage
- // 实现等待的chan
- isSetHandler chan int
- // 是否准备好,当开始消费的时候标记它为开始
- ready chan bool
- // 消费是否已经关闭
- done chan bool
- //消费者topic信息
- consumerTopics []string
- //group id
- groupName string
- //producer
- producer sarama.SyncProducer
- //brokers
- brokers []string
- // 日志文件路径
- logDirPath string
- // 生产者/消费者的配置属性
- config *sarama.Config
- }
- //KafkaClient构造函数
- func newKafkaClient(clientType clientType, kafkaClientConfig kafkaClientConfig) (*kafkaClient, error) {
- // 初始化日志,只需要加载一次
- err := func() error {
- saramaLoggerInitLock.Lock()
- defer saramaLoggerInitLock.Unlock()
- if saramaLogger == nil {
- file, err := os.OpenFile(kafkaClientConfig.LogPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
- if err != nil {
- return err
- }
- saramaLogger = log.New(file, "[Sarama] ", log.LstdFlags)
- sarama.Logger = saramaLogger
- }
- return nil
- }()
- if err != nil {
- return nil, err
- }
- client := kafkaClient{
- clientType: clientType,
- msg: make(chan *sarama.ConsumerMessage),
- done: make(chan bool),
- brokers: kafkaClientConfig.Host,
- groupName: kafkaClientConfig.GroupName,
- logDirPath: kafkaClientConfig.LogPath,
- ctx: application.NewContext(),
- isSetHandler: make(chan int),
- ready: make(chan bool),
- msgIsHandle: make(chan int),
- }
- configKafka := sarama.NewConfig()
- // 配置文件
- configKafka.Version = sarama.V0_11_0_1 //线上运行版本
- configKafka.ClientID = kafkaClientConfig.GroupName
- configKafka.Metadata.Full = false
- // 有些情况下是不需要初始化 生产者的,比如消费者,所以需要做判断
- switch clientType {
- case producerClient:
- configKafka.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
- configKafka.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
- configKafka.Producer.Return.Successes = true
- configKafka.Producer.Compression = sarama.CompressionSnappy
- configKafka.Producer.Flush.Frequency = 500 * time.Millisecond
- kafkaProducer, err := sarama.NewSyncProducer(kafkaClientConfig.Host, configKafka)
- if err != nil {
- return nil, err
- }
- client.producer = kafkaProducer
- case consumerClient:
- configKafka.Consumer.Offsets.Initial = sarama.OffsetNewest
- configKafka.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
- client.consumerTopics = kafkaClientConfig.Topic
- default:
- return nil, errors.New("kafka not support other type")
- }
- client.config = configKafka
- logger.Infoc(client.ctx, "[Kafka] new kafka client success, info=%+v", client)
- return &client, nil
- }
- // 不需要携带key
- func (this *kafkaClient) ProducerMsg(ctx context.Context, topic, message string) error {
- return this.Producer(ctx, topic, message, "")
- }
- //消息message string
- func (this *kafkaClient) Producer(ctx context.Context, topic, message string, key string) error {
- select {
- case <-this.done: // 如果关闭,抛出异常
- logger.Errorc(ctx, "[Kafka] produce message err, topic: %s, message: %s, err: %t", topic, message, producerIsDoneError)
- return producerIsDoneError
- default:
- // 默认则可以发送消息
- msg := sarama.ProducerMessage{
- Topic: topic,
- Timestamp: time.Now(),
- }
- msg.Value = sarama.StringEncoder(message)
- // 设置key,会使得Partition不均匀,推荐不设置
- if key != "" {
- msg.Key = sarama.StringEncoder(key)
- }
- partition, offset, err := this.producer.SendMessage(&msg)
- if err != nil {
- logger.Errorc(ctx, "[Kafka] produce message err, topic: %s, message: %s, partition: %d, offset: %d, key:%s, err: %v", topic, message, partition, offset, key, err)
- return err
- }
- logger.Infoc(ctx, "[Kafka] produce message success, topic: %s, message: %s, partition: %d, offset: %d, key:%s", topic, message, partition, offset, key)
- return nil
- }
- }
- //kafka消费者
- func (this *kafkaClient) Consumer() error {
- client, err := sarama.NewConsumerGroup(this.brokers, this.groupName, this.config)
- if err != nil {
- return err
- }
- logger.Infoc(this.ctx, "[Kafka] new consumer group success")
- wg := &sync.WaitGroup{}
- wg.Add(1)
- ctx, cancel := context.WithCancel(this.ctx)
- go func() {
- // 等待设置handler
- <-this.isSetHandler
- // 所有的G 都需要recover,防止全局Panic
- defer func() {
- wg.Done()
- if err := recover(); err != nil {
- logger.Errorc(ctx, "[Kafka] consume panic, err:%v", err)
- }
- }()
- for {
- logger.Infoc(ctx, "[Kafka] start consume")
- /**
- 这里需要注意 ctx 如果发生了一次 cancel 会无脑的循环下去。因为调用cancel后,channel关闭,会导致每次都会收到Done()
- 所以这里需要抓取 ctx.Err() 的异常跳出循环!
- */
- if err := client.Consume(ctx, this.consumerTopics, this); err != nil { //发生rb 不会异常
- logger.Errorc(ctx, "[kafka consumer] err from consumer:%s", err.Error())
- return
- }
- if ctx.Err() != nil {
- logger.Errorc(ctx, "[Kafka consumer] client.Consume err:%v", ctx.Err())
- return
- }
- // 每次都初始化,防止 channel 关闭后继续写入出现panic,因为setUp中设置了
- this.ready = make(chan bool)
- }
- }()
- <-this.ready // Await till the consumer has been set up
- logger.Infoc(this.ctx, "[Kafka] consumer up and running !")
- // 这里去监听done,其实就是如果手动调用了Close,会触发这个
- select {
- // 监听这个是因为,如果ctx是外层传入的就可以取消
- case <-ctx.Done():
- logger.Infoc(this.ctx, "[Kafka] consumer terminating: context cancelled")
- // 调用Close方法会触发这个
- case <-this.done:
- logger.Infoc(this.ctx, "[Kafka] consumer terminating: via signal")
- }
- // todo 问题:1、如果client.Close()则退出消费,不需要调用cancel()去通知消费
- // 取消Ctx,取消G的消费
- cancel()
- // 等待接触,如果G退出
- wg.Wait()
- if err = client.Close(); err != nil {
- return err
- }
- return nil
- }
- func (this *kafkaClient) Close() error {
- // 不能随便关闭写的channel,如果关闭后还有写入会引发程序panic,需要控制写入(推荐不写入,只close通知)
- defer func() {
- close(this.done) // 标记关闭
- //close(this.msg) // 关闭消息chan
- //close(this.msgIsHandle) // 不能关闭,原因:因为消费过程中可能存在程序处理过程中,kafka close掉,此时处理完成往里面写数据,会panic
- close(this.isSetHandler) // 关闭,防止bug,导致消费的G一直阻塞
- }()
- if this.producer != nil {
- if err := this.producer.Close(); err != nil {
- return err
- }
- }
- return nil
- }
- //获取kafka消息
- func (this *kafkaClient) GetMessage() <-chan *sarama.ConsumerMessage {
- return this.msg
- }
- // 添加消息
- func (this *kafkaClient) AddMessage(message *sarama.ConsumerMessage) {
- this.msg <- message
- }
- // 通知消息处理完成
- func (this *kafkaClient) SetMessageIsHandle() {
- this.msgIsHandle <- 0
- }
- // 等待业务逻辑处理完成
- func (this *kafkaClient) WaitMessageIsHandle() <-chan int {
- return this.msgIsHandle
- }
- // Setup is run at the beginning of a new session, before ConsumeClaim
- func (this *kafkaClient) Setup(ss sarama.ConsumerGroupSession) error {
- logger.Infoc(this.ctx, "[Kafka] Consumer Setup ss:%+v", ss)
- // 关闭channel(注意不能重复关闭一个已经关闭的channel,所以需要重复实例化),通知已启动
- close(this.ready)
- return nil
- }
- // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
- func (this *kafkaClient) Cleanup(ss sarama.ConsumerGroupSession) error {
- logger.Infoc(this.ctx, "[Kafka] Consumer Cleanup ss:%+v", ss)
- return nil
- }
- // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
- func (this *kafkaClient) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
- // NOTE:
- // Do not move the code below to a goroutine.
- // The `ConsumeClaim` itself is called within a goroutine, see:
- // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
- for message := range claim.Messages() {
- this.AddMessage(message)
- <-this.WaitMessageIsHandle()
- session.MarkMessage(message, "") // 必须标记不然不会提交偏移量
- }
- return nil
- }
|