package kafka import ( "context" "errors" "log" "os" "sync" "time" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/application" "git.shuncheng.lu/bigthing/gocommon/pkg/logger" "github.com/Shopify/sarama" ) type clientType uint8 const ( producerClient clientType = iota + 1 consumerClient ) var ( producerIsDoneError = errors.New("the kafka client is done, can not producer message") ) type kafkaClientConfig struct { Host []string GroupName string LogPath string Topic []string } var ( // 日志初始化 saramaLogger sarama.StdLogger saramaLoggerInitLock sync.Mutex ) type kafkaClient struct { // 客户端类型 clientType clientType // 上下文信息 ctx context.Context // 消费者 //接收消息在业务逻辑里处理 msg chan *sarama.ConsumerMessage // 标记消息处理完成 msgIsHandle chan int // 处理消息的handler,如果没有设置则会等待到设置后才去消费! handler HandlerKafkaMessage // 实现等待的chan isSetHandler chan int // 是否准备好,当开始消费的时候标记它为开始 ready chan bool // 消费是否已经关闭 done chan bool //消费者topic信息 consumerTopics []string //group id groupName string //producer producer sarama.SyncProducer //brokers brokers []string // 日志文件路径 logDirPath string // 生产者/消费者的配置属性 config *sarama.Config } //KafkaClient构造函数 func newKafkaClient(clientType clientType, kafkaClientConfig kafkaClientConfig) (*kafkaClient, error) { // 初始化日志,只需要加载一次 err := func() error { saramaLoggerInitLock.Lock() defer saramaLoggerInitLock.Unlock() if saramaLogger == nil { file, err := os.OpenFile(kafkaClientConfig.LogPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil { return err } saramaLogger = log.New(file, "[Sarama] ", log.LstdFlags) sarama.Logger = saramaLogger } return nil }() if err != nil { return nil, err } client := kafkaClient{ clientType: clientType, msg: make(chan *sarama.ConsumerMessage), done: make(chan bool), brokers: kafkaClientConfig.Host, groupName: kafkaClientConfig.GroupName, logDirPath: kafkaClientConfig.LogPath, ctx: application.NewContext(), isSetHandler: make(chan int), ready: make(chan bool), msgIsHandle: make(chan int), } configKafka := sarama.NewConfig() // 配置文件 configKafka.Version = sarama.V0_11_0_1 //线上运行版本 configKafka.ClientID = kafkaClientConfig.GroupName configKafka.Metadata.Full = false // 有些情况下是不需要初始化 生产者的,比如消费者,所以需要做判断 switch clientType { case producerClient: configKafka.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message configKafka.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message configKafka.Producer.Return.Successes = true configKafka.Producer.Compression = sarama.CompressionSnappy configKafka.Producer.Flush.Frequency = 500 * time.Millisecond kafkaProducer, err := sarama.NewSyncProducer(kafkaClientConfig.Host, configKafka) if err != nil { return nil, err } client.producer = kafkaProducer case consumerClient: configKafka.Consumer.Offsets.Initial = sarama.OffsetNewest configKafka.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange client.consumerTopics = kafkaClientConfig.Topic default: return nil, errors.New("kafka not support other type") } client.config = configKafka logger.Infoc(client.ctx, "[Kafka] new kafka client success, info=%+v", client) return &client, nil } // 不需要携带key func (this *kafkaClient) ProducerMsg(ctx context.Context, topic, message string) error { return this.Producer(ctx, topic, message, "") } //消息message string func (this *kafkaClient) Producer(ctx context.Context, topic, message string, key string) error { select { case <-this.done: // 如果关闭,抛出异常 logger.Errorc(ctx, "[Kafka] produce message err, topic: %s, message: %s, err: %t", topic, message, producerIsDoneError) return producerIsDoneError default: // 默认则可以发送消息 msg := sarama.ProducerMessage{ Topic: topic, Timestamp: time.Now(), } msg.Value = sarama.StringEncoder(message) // 设置key,会使得Partition不均匀,推荐不设置 if key != "" { msg.Key = sarama.StringEncoder(key) } partition, offset, err := this.producer.SendMessage(&msg) if err != nil { logger.Errorc(ctx, "[Kafka] produce message err, topic: %s, message: %s, partition: %d, offset: %d, key:%s, err: %v", topic, message, partition, offset, key, err) return err } logger.Infoc(ctx, "[Kafka] produce message success, topic: %s, message: %s, partition: %d, offset: %d, key:%s", topic, message, partition, offset, key) return nil } } //kafka消费者 func (this *kafkaClient) Consumer() error { client, err := sarama.NewConsumerGroup(this.brokers, this.groupName, this.config) if err != nil { return err } logger.Infoc(this.ctx, "[Kafka] new consumer group success") wg := &sync.WaitGroup{} wg.Add(1) ctx, cancel := context.WithCancel(this.ctx) go func() { // 等待设置handler <-this.isSetHandler // 所有的G 都需要recover,防止全局Panic defer func() { wg.Done() if err := recover(); err != nil { logger.Errorc(ctx, "[Kafka] consume panic, err:%v", err) } }() for { logger.Infoc(ctx, "[Kafka] start consume") /** 这里需要注意 ctx 如果发生了一次 cancel 会无脑的循环下去。因为调用cancel后,channel关闭,会导致每次都会收到Done() 所以这里需要抓取 ctx.Err() 的异常跳出循环! */ if err := client.Consume(ctx, this.consumerTopics, this); err != nil { //发生rb 不会异常 logger.Errorc(ctx, "[kafka consumer] err from consumer:%s", err.Error()) return } if ctx.Err() != nil { logger.Errorc(ctx, "[Kafka consumer] client.Consume err:%v", ctx.Err()) return } // 每次都初始化,防止 channel 关闭后继续写入出现panic,因为setUp中设置了 this.ready = make(chan bool) } }() <-this.ready // Await till the consumer has been set up logger.Infoc(this.ctx, "[Kafka] consumer up and running !") // 这里去监听done,其实就是如果手动调用了Close,会触发这个 select { // 监听这个是因为,如果ctx是外层传入的就可以取消 case <-ctx.Done(): logger.Infoc(this.ctx, "[Kafka] consumer terminating: context cancelled") // 调用Close方法会触发这个 case <-this.done: logger.Infoc(this.ctx, "[Kafka] consumer terminating: via signal") } // todo 问题:1、如果client.Close()则退出消费,不需要调用cancel()去通知消费 // 取消Ctx,取消G的消费 cancel() // 等待接触,如果G退出 wg.Wait() if err = client.Close(); err != nil { return err } return nil } func (this *kafkaClient) Close() error { // 不能随便关闭写的channel,如果关闭后还有写入会引发程序panic,需要控制写入(推荐不写入,只close通知) defer func() { close(this.done) // 标记关闭 //close(this.msg) // 关闭消息chan //close(this.msgIsHandle) // 不能关闭,原因:因为消费过程中可能存在程序处理过程中,kafka close掉,此时处理完成往里面写数据,会panic close(this.isSetHandler) // 关闭,防止bug,导致消费的G一直阻塞 }() if this.producer != nil { if err := this.producer.Close(); err != nil { return err } } return nil } //获取kafka消息 func (this *kafkaClient) GetMessage() <-chan *sarama.ConsumerMessage { return this.msg } // 添加消息 func (this *kafkaClient) AddMessage(message *sarama.ConsumerMessage) { this.msg <- message } // 通知消息处理完成 func (this *kafkaClient) SetMessageIsHandle() { this.msgIsHandle <- 0 } // 等待业务逻辑处理完成 func (this *kafkaClient) WaitMessageIsHandle() <-chan int { return this.msgIsHandle } // Setup is run at the beginning of a new session, before ConsumeClaim func (this *kafkaClient) Setup(ss sarama.ConsumerGroupSession) error { logger.Infoc(this.ctx, "[Kafka] Consumer Setup ss:%+v", ss) // 关闭channel(注意不能重复关闭一个已经关闭的channel,所以需要重复实例化),通知已启动 close(this.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (this *kafkaClient) Cleanup(ss sarama.ConsumerGroupSession) error { logger.Infoc(this.ctx, "[Kafka] Consumer Cleanup ss:%+v", ss) return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (this *kafkaClient) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { this.AddMessage(message) <-this.WaitMessageIsHandle() session.MarkMessage(message, "") // 必须标记不然不会提交偏移量 } return nil }